Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vector exits after start when AMQP sink is unhealthy despite setting --require-healthy flag to false #19466

Open
mtrbpr opened this issue Dec 25, 2023 · 2 comments
Labels
type: bug A code related bug.

Comments

@mtrbpr
Copy link

mtrbpr commented Dec 25, 2023

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

It seems that disabling sinks healthcheck via setting --require-healthy flag to false on startup has no effect when sink type is amqp and no vhost is specified in amqp connection_string , or the vhost does not exist at all.

  • Action:
vector --config vector.yaml --require-healthy false
  • Observed behavior: Vector exits a few moments later leaving following logs:
2023-12-25T16:00:48.881289Z  INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=info,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-12-25T16:00:48.885424Z  INFO vector::app: Loading configs. paths=["vecdor.yaml"]
2023-12-25T16:00:48.895033Z ERROR lapin::channel: Connection closed channel=0 method=Close { reply_code: 530, reply_text: ShortString("NOT_ALLOWED - vhost wrong not found"), class_id: 10, method_id: 40 } error=AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }
2023-12-25T16:00:48.895108Z ERROR lapin::channels: Connection error error=protocol error: AMQP hard error: NOT-ALLOWED: NOT_ALLOWED - vhost wrong not found
2023-12-25T16:00:48.895189Z ERROR vector::topology::builder: Configuration error. error=Sink "amqp-sink": creating amqp producer failed: protocol error: AMQP hard error: NOT-ALLOWED: NOT_ALLOWED - vhost wrong not found
  • Expected behavior: Vector should keep running

Same thing happens when connection string points to a wrong port:

  • Observed log:
2023-12-25T16:16:55.233320Z  INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=info,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-12-25T16:16:55.235467Z  INFO vector::app: Loading configs. paths=["vecdor.yaml"]
2023-12-25T16:16:55.240910Z ERROR vector::topology::builder: Configuration error. error=Sink "amqp-sink": creating amqp producer failed: IO error: Connection refused (os error 111)

I tried setting wrong port on other sink types like kafka and elasticsearch and it didn't cause vector to crash so I presume what I'm reporting should be a bug related to amqp sink or the way vector is using it.

Configuration

sources:
  demo:
    type: demo_logs
    format: json

sinks:
  amqp-sink:
    connection_string: amqp://guest:[email protected]:5672/wrong
    encoding:
      codec: json
    exchange: queue
    inputs:
      - demo
    type: amqp

Version

vector 0.34.1 (x86_64-unknown-linux-gnu 86f1c22 2023-11-16 14:59:10.486846964)

Debug Output

2023-12-25T16:13:55.768432Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2023-12-25T16:13:55.768481Z  INFO vector::app: Log level is enabled. level="vector=trace,codec=trace,vrl=trace,file_source=trace,tower_limit=trace,rdkafka=trace,buffers=trace,lapin=trace,kube=trace"
2023-12-25T16:13:55.768571Z DEBUG vector::app: messaged="Building runtime." worker_threads=8
2023-12-25T16:13:55.770415Z  INFO vector::app: Loading configs. paths=["vecdor.yaml"]
2023-12-25T16:13:55.771797Z DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
2023-12-25T16:13:55.772267Z DEBUG vector::topology::builder: Building new source. component=demo
2023-12-25T16:13:55.772501Z DEBUG vector::topology::builder: Building new sink. component=amqp-sink
2023-12-25T16:13:55.773720Z DEBUG sink{component_kind="sink" component_id=amqp-sink component_type=amqp}: lapin::channels: create channel id=0
2023-12-25T16:13:55.773824Z TRACE sink{component_kind="sink" component_id=amqp-sink component_type=amqp}: lapin::channel: send_frame channel=0
2023-12-25T16:13:55.773862Z TRACE sink{component_kind="sink" component_id=amqp-sink component_type=amqp}: lapin::channel: wake channel=0
2023-12-25T16:13:55.775209Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.775236Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.775244Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.775252Z TRACE lapin::io_loop: io_loop do_run can_read=true can_write=true has_data=true
2023-12-25T16:13:55.775262Z TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::ProtocolHeader(0.9.1)
2023-12-25T16:13:55.775296Z TRACE lapin::io_loop: wrote 8 bytes
2023-12-25T16:13:55.775334Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.775341Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.775348Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.776116Z TRACE lapin::socket_state: Got event for socket event=Readable
2023-12-25T16:13:55.776136Z TRACE lapin::io_loop: read 517 bytes
2023-12-25T16:13:55.776208Z TRACE lapin::channels: will handle frame frame=Method(0, Connection(Start(Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 56, 52, 53, 98, 97, 52, 99, 55, 100, 97, 54, 57])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 51, 32, 86, 77, 119, 97, 114, 101, 44, 32, 73, 110, 99, 46, 32, 111, 114, 32, 105, 116, 115, 32, 97, 102, 102, 105, 108, 105, 97, 116, 101, 115, 46])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 56])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 48]))}), mechanisms: LongString([80, 76, 65, 73, 78, 32, 65, 77, 81, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) })))
2023-12-25T16:13:55.776275Z TRACE lapin::channel: Server sent connection::Start method=Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 56, 52, 53, 98, 97, 52, 99, 55, 100, 97, 54, 57])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 51, 32, 86, 77, 119, 97, 114, 101, 44, 32, 73, 110, 99, 46, 32, 111, 114, 32, 105, 116, 115, 32, 97, 102, 102, 105, 108, 105, 97, 116, 101, 115, 46])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 56])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 48]))}), mechanisms: LongString([80, 76, 65, 73, 78, 32, 65, 77, 81, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) }
2023-12-25T16:13:55.776344Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.776354Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.776362Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.776400Z TRACE lapin::channel: send_frame channel=0
2023-12-25T16:13:55.776415Z TRACE lapin::channel: wake channel=0
2023-12-25T16:13:55.776456Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.776492Z TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(StartOk(StartOk { client_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("platform"): LongString(LongString([114, 117, 115, 116])), ShortString("product"): LongString(LongString([108, 97, 112, 105, 110])), ShortString("version"): LongString(LongString([50, 46, 51, 46, 49]))}), mechanism: ShortString("PLAIN"), response: LongString([0, 103, 117, 101, 115, 116, 0, 103, 117, 101, 115, 116]), locale: ShortString("en_US") })))
2023-12-25T16:13:55.776536Z TRACE lapin::io_loop: wrote 315 bytes
2023-12-25T16:13:55.776548Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.776574Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.776584Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.776780Z TRACE lapin::socket_state: Got event for socket event=Readable
2023-12-25T16:13:55.776792Z TRACE lapin::io_loop: read 20 bytes
2023-12-25T16:13:55.776807Z TRACE lapin::channels: will handle frame frame=Method(0, Connection(Tune(Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 })))
2023-12-25T16:13:55.776815Z TRACE lapin::channel: Server sent Connection::Tune method=Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 }
2023-12-25T16:13:55.776830Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.776840Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.776852Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.776885Z TRACE lapin::channel: send_frame channel=0
2023-12-25T16:13:55.776895Z TRACE lapin::channel: wake channel=0
2023-12-25T16:13:55.776914Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.776923Z TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(TuneOk(TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 60 })))
2023-12-25T16:13:55.776944Z TRACE lapin::io_loop: wrote 20 bytes
2023-12-25T16:13:55.776963Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.776971Z TRACE lapin::channel: send_frame channel=0
2023-12-25T16:13:55.776973Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.777016Z TRACE lapin::frames: state is now waiting channel=0 expected_reply=ExpectedReply(ConnectionOpenOk(Pinky, Connection { configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ConnectionStatus { state: Connecting, vhost: "wrong", username: "guest", blocked: false }, channels: Channels { channels: [Channel { id: 0, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connecting, vhost: "wrong", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames }], channel_id: IdSequence { allow_zero: false, max: None, id: 0 }, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, frames: Frames, connection_status: ConnectionStatus { state: Connecting, vhost: "wrong", username: "guest", blocked: false }, error_handler: ErrorHandler } }))
2023-12-25T16:13:55.777053Z TRACE lapin::channel: wake channel=0
2023-12-25T16:13:55.777063Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=true
2023-12-25T16:13:55.777074Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.777084Z TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(Open(Open { virtual_host: ShortString("wrong") })))
2023-12-25T16:13:55.777114Z TRACE lapin::io_loop: wrote 20 bytes
2023-12-25T16:13:55.777130Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.777143Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.777156Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.777424Z TRACE lapin::socket_state: Got event for socket event=Readable
2023-12-25T16:13:55.777442Z TRACE lapin::io_loop: read 54 bytes
2023-12-25T16:13:55.777465Z TRACE lapin::channels: will handle frame frame=Method(0, Connection(Close(Close { reply_code: 530, reply_text: ShortString("NOT_ALLOWED - vhost wrong not found"), class_id: 10, method_id: 40 })))
2023-12-25T16:13:55.777505Z ERROR lapin::channel: Connection closed channel=0 method=Close { reply_code: 530, reply_text: ShortString("NOT_ALLOWED - vhost wrong not found"), class_id: 10, method_id: 40 } error=AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }
2023-12-25T16:13:55.777526Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionClosing
2023-12-25T16:13:55.777569Z TRACE lapin::internal_rpc: Handling internal RPC command command=SetConnectionClosing
2023-12-25T16:13:55.777614Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionClosing
2023-12-25T16:13:55.777612Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionError(ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777629Z TRACE lapin::internal_rpc: Queuing internal RPC command command=CloseConnection(200, "OK", 0, 0)
2023-12-25T16:13:55.777636Z TRACE lapin::internal_rpc: Handling internal RPC command command=SetConnectionClosing
2023-12-25T16:13:55.777649Z TRACE lapin::internal_rpc: Handling internal RPC command command=SetConnectionError(ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777655Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SendConnectionCloseOk(ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777668Z ERROR lapin::channels: Connection error error=protocol error: AMQP hard error: NOT-ALLOWED: NOT_ALLOWED - vhost wrong not found
2023-12-25T16:13:55.777671Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.777687Z TRACE lapin::internal_rpc: Stopping internal RPC command
2023-12-25T16:13:55.777684Z TRACE lapin::internal_rpc: Queuing internal RPC command command=RemoveChannel(0, ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777703Z TRACE lapin::internal_rpc: Handling internal RPC command command=CloseConnection(200, "OK", 0, 0)
2023-12-25T16:13:55.777721Z TRACE lapin::internal_rpc: Handling internal RPC command command=SendConnectionCloseOk(ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777752Z ERROR vector::topology::builder: Configuration error. error=Sink "amqp-sink": creating amqp producer failed: protocol error: AMQP hard error: NOT-ALLOWED: NOT_ALLOWED - vhost wrong not found
2023-12-25T16:13:55.777761Z TRACE lapin::internal_rpc: InternalRPC stopped
2023-12-25T16:13:55.777790Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionError(InvalidChannelState(Error))
2023-12-25T16:13:55.777826Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionError(InvalidChannelState(Error))

Example Data

No response

Additional Context

vector is installed with apt and rabbitmq is up on port 5672 with following docker-compose deployment:

services: 
  rabbitmq:
      image: rabbitmq:management
      container_name: rabbitmq
      environment:
        - RABBITMQ_DEFAULT_USER=guest
        - RABBITMQ_DEFAULT_PASS=guest
      ports:
        - "5672:5672"
        - "15672:15672"

References

No response

@mtrbpr mtrbpr added the type: bug A code related bug. label Dec 25, 2023
@sxkote
Copy link

sxkote commented Jan 25, 2024

Hi. I get the same behavior.
Even checkhealthy:enabled=false
When the RabbitMQ falls and then reruns, Vector stops to send messages into it.

@fedorkanin
Copy link

That's hilarious. We are experiencing similar issues with blocking behavior in amqp091-go client (issue1, issue2) and considered Vector as a replacement. Now it seems I have encountered a similar issue to @sxkote's.

Nov 11 13:07:57 my-rabbit-host.example vector[1234567]: 2024-11-11T13:07:57.516987Z ERROR sink{component_kind="sink" component_id=rabbitmq_sink component_type=amqp}:request{request_id=REQUEST_ID_1}: vector_common::internal_event::service: Service call failed. No retries or retries exhausted. error=Some(AcknowledgementFailed { error: IOError(Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }) }) request_id=REQUEST_ID_1 error_type="request_failed" stage="sending" internal_log_rate_limit=true
...
some suppressed logs
...
Nov 11 13:08:13 my-rabbit-host.example vector[1234567]: 2024-11-11T13:08:13.215606Z ERROR sink{component_kind="sink" component_id=rabbitmq_sink component_type=amqp}:request{request_id=REQUEST_ID_2}: vector_common::internal_event::service: Service call failed. No retries or retries exhausted. error=Some(DeliveryFailed { error: InvalidChannelState(Error) }) request_id=REQUEST_ID_2 error_type="request_failed" stage="sending" internal_log_rate_limit=true

This log with InvalidChannelState(Error) is then logged forever alongside with some suppressed logs, the reconnect does not happen. If restarted, Vector connects and functions as expected.

I suggest this issue should be replicated in (integration) tests. For now I was unable to find out if a root cause is in lapin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

3 participants