Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_es: Allow Logstash_Prefix_Key with prefix #8902

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jplitza
Copy link

@jplitza jplitza commented May 31, 2024

The option Logstash_Prefix_Key should be a record accessor.

There is a convenience feature that automatically prefixes a $ when the value is just a static string (since that is expected to be set as Logstash_Prefix instead).

However, record accessor can contain the $ not just at the beginning. One example would be
containers-$kubernetes['namespace_name']

This patch thus makes the autodetection look for a $ in the whole value, not just the first character.

Fixes #8773


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • [N/A] Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

The option Logstash_Prefix_Key should be a record accessor.

There is a convenience feature that automatically prefixes a $ when the value is just a static string (since that is expected to be set as Logstash_Prefix instead).

However, record accessor can contain the $ not just at the beginning. One example would be
  containers-$kubernetes['namespace_name']

This patch thus makes the autodetection look for a $ in the whole value, not just the first character.

Signed-off-by: Jan-Philipp Litza <[email protected]>
@jplitza
Copy link
Author

jplitza commented May 31, 2024

I've tested this against a container with the elasticsearch image.
This config shows that the previous behavior of interpreting a value without $ as an accessor is retained, by writing into the index "prod":

[INPUT]
    Name dummy
    Dummy {"namespace": "prod", "message": "Test"}

[Output]
    Name es
    Match *
    HTTP_User elastic
    HTTP_Passwd foo # (needs to be generated inside the container with `bin/elasticsearch-reset-password -u elastic`)
    Type _doc
    Tls on
    tls.verify off
    Logstash_Format On
    Logstash_Prefix namespace-unknown
    Logstash_Prefix_Key namespace
    Trace_Error On
    Suppress_Type_Name On

While the following config shows the new behavior of allowing a static prefix and writing to the index "namespace-prod":

[INPUT]
    Name dummy
    Dummy {"namespace": "prod", "message": "Test"}

[Output]
    Name es
    Match *
    HTTP_User elastic
    HTTP_Passwd foo # (needs to be generated inside the container with `bin/elasticsearch-reset-password -u elastic`)
    Type _doc
    Tls on
    tls.verify off
    Logstash_Format On
    Logstash_Prefix namespace-unknown
    Logstash_Prefix_Key namespace-$namespace
    Trace_Error On
    Suppress_Type_Name On

The latter produced the following debug output under valgrind:

$ valgrind --leak-check=full ./bin/fluent-bit -c test.conf -v
==872248== Memcheck, a memory error detector
==872248== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==872248== Using Valgrind-3.18.1 and LibVEX; rerun with -h for copyright info
==872248== Command: ./bin/fluent-bit -c test.conf -v
==872248== 
Fluent Bit v3.0.7
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

___________.__                        __    __________.__  __          ________  
\_   _____/|  |  __ __   ____   _____/  |_  \______   \__|/  |_  ___  _\_____  \ 
 |    __)  |  | |  |  \_/ __ \ /    \   __\  |    |  _/  \   __\ \  \/ / _(__  < 
 |     \   |  |_|  |  /\  ___/|   |  \  |    |    |   \  ||  |    \   / /       \
 \___  /   |____/____/  \___  >___|  /__|    |______  /__||__|     \_/ /______  /
     \/                     \/     \/               \/                        \/ 

[2024/05/31 15:15:24] [ info] Configuration:
[2024/05/31 15:15:24] [ info]  flush time     | 1.000000 seconds
[2024/05/31 15:15:24] [ info]  grace          | 5 seconds
[2024/05/31 15:15:24] [ info]  daemon         | 0
[2024/05/31 15:15:24] [ info] ___________
[2024/05/31 15:15:24] [ info]  inputs:
[2024/05/31 15:15:24] [ info]      dummy
[2024/05/31 15:15:24] [ info] ___________
[2024/05/31 15:15:24] [ info]  filters:
[2024/05/31 15:15:24] [ info] ___________
[2024/05/31 15:15:24] [ info]  outputs:
[2024/05/31 15:15:24] [ info]      es.0
[2024/05/31 15:15:24] [ info] ___________
[2024/05/31 15:15:24] [ info]  collectors:
[2024/05/31 15:15:24] [ info] [fluent bit] version=3.0.7, commit=8348709e55, pid=872248
[2024/05/31 15:15:24] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/05/31 15:15:24] [ info] [storage] ver=1.5.2, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/05/31 15:15:24] [ info] [cmetrics] version=0.9.0
[2024/05/31 15:15:24] [ info] [ctraces ] version=0.5.1
[2024/05/31 15:15:24] [ info] [input:dummy:dummy.0] initializing
[2024/05/31 15:15:24] [ info] [input:dummy:dummy.0] storage_strategy='memory' (memory only)
[2024/05/31 15:15:24] [debug] [dummy:dummy.0] created event channels: read=22 write=24
[2024/05/31 15:15:24] [debug] [es:es.0] created event channels: read=26 write=27
[2024/05/31 15:15:24] [debug] [output:es:es.0] host=127.0.0.1 port=9200 uri=/_bulk index=fluent-bit type=_doc
[2024/05/31 15:15:24] [ info] [output:es:es.0] worker #0 started
[2024/05/31 15:15:24] [ info] [output:es:es.0] worker #1 started
[2024/05/31 15:15:24] [ info] [sp] stream processor started
[2024/05/31 15:15:26] [debug] [task] created task=0x5327a60 id=0 OK
[2024/05/31 15:15:26] [debug] [output:es:es.0] task_id=0 assigned to thread #0
[2024/05/31 15:15:27] [debug] [upstream] KA connection #52 to 127.0.0.1:9200 is connected
[2024/05/31 15:15:27] [debug] [http_client] not using http_proxy for header
[2024/05/31 15:15:27] [debug] [task] created task=0x5426300 id=1 OK
[2024/05/31 15:15:27] [debug] [output:es:es.0] task_id=1 assigned to thread #1
[2024/05/31 15:15:27] [debug] [output:es:es.0] HTTP Status=200 URI=/_bulk
[2024/05/31 15:15:27] [debug] [output:es:es.0] Elasticsearch response
{"errors":false,"took":9,"items":[{"create":{"_index":"namespace-prod-2024.05.31","_id":"pRLKzo8BRVHJWXgMe7P_","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":20,"_primary_term":1,"status":201}}]}
[2024/05/31 15:15:27] [debug] [upstream] KA connection #52 to 127.0.0.1:9200 is now available
[2024/05/31 15:15:27] [debug] [out flush] cb_destroy coro_id=0
[2024/05/31 15:15:27] [debug] [task] destroy task=0x5327a60 (task_id=0)
[2024/05/31 15:15:27] [debug] [upstream] KA connection #53 to 127.0.0.1:9200 is connected
[2024/05/31 15:15:27] [debug] [http_client] not using http_proxy for header
[2024/05/31 15:15:27] [debug] [output:es:es.0] HTTP Status=200 URI=/_bulk
[2024/05/31 15:15:27] [debug] [output:es:es.0] Elasticsearch response
{"errors":false,"took":10,"items":[{"create":{"_index":"namespace-prod-2024.05.31","_id":"phLKzo8BRVHJWXgMfLOt","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":21,"_primary_term":1,"status":201}}]}
[2024/05/31 15:15:27] [debug] [upstream] KA connection #53 to 127.0.0.1:9200 is now available
[2024/05/31 15:15:27] [debug] [out flush] cb_destroy coro_id=0
[2024/05/31 15:15:27] [debug] [task] destroy task=0x5426300 (task_id=1)
[2024/05/31 15:15:28] [debug] [task] created task=0x75198b0 id=0 OK
[2024/05/31 15:15:28] [debug] [output:es:es.0] task_id=0 assigned to thread #0
[2024/05/31 15:15:28] [debug] [upstream] KA connection #52 to 127.0.0.1:9200 has been assigned (recycled)
[2024/05/31 15:15:28] [debug] [http_client] not using http_proxy for header
[2024/05/31 15:15:28] [debug] [output:es:es.0] HTTP Status=200 URI=/_bulk
[2024/05/31 15:15:28] [debug] [output:es:es.0] Elasticsearch response
{"errors":false,"took":8,"items":[{"create":{"_index":"namespace-prod-2024.05.31","_id":"pxLKzo8BRVHJWXgMf7On","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":22,"_primary_term":1,"status":201}}]}
[2024/05/31 15:15:28] [debug] [upstream] KA connection #52 to 127.0.0.1:9200 is now available
[2024/05/31 15:15:28] [debug] [out flush] cb_destroy coro_id=1
[2024/05/31 15:15:28] [debug] [task] destroy task=0x75198b0 (task_id=0)
[2024/05/31 15:15:29] [debug] [task] created task=0x757eb60 id=0 OK
[2024/05/31 15:15:29] [debug] [output:es:es.0] task_id=0 assigned to thread #1
[2024/05/31 15:15:29] [debug] [upstream] KA connection #53 to 127.0.0.1:9200 has been assigned (recycled)
[2024/05/31 15:15:29] [debug] [http_client] not using http_proxy for header
[2024/05/31 15:15:29] [debug] [output:es:es.0] HTTP Status=200 URI=/_bulk
[2024/05/31 15:15:29] [debug] [output:es:es.0] Elasticsearch response
{"errors":false,"took":9,"items":[{"create":{"_index":"namespace-prod-2024.05.31","_id":"qBLKzo8BRVHJWXgMg7Oe","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":23,"_primary_term":1,"status":201}}]}
[2024/05/31 15:15:29] [debug] [task] destroy task=0x757eb60 (task_id=0)
[2024/05/31 15:15:29] [debug] [upstream] KA connection #53 to 127.0.0.1:9200 is now available
[2024/05/31 15:15:29] [debug] [out flush] cb_destroy coro_id=1
^C[2024/05/31 15:15:30] [engine] caught signal (SIGINT)
[2024/05/31 15:15:30] [debug] [task] created task=0x75e1e20 id=0 OK
[2024/05/31 15:15:30] [debug] [output:es:es.0] task_id=0 assigned to thread #0
[2024/05/31 15:15:30] [ warn] [engine] service will shutdown in max 5 seconds
[2024/05/31 15:15:30] [ info] [input] pausing dummy.0
[2024/05/31 15:15:30] [debug] [upstream] KA connection #52 to 127.0.0.1:9200 has been assigned (recycled)
[2024/05/31 15:15:30] [debug] [http_client] not using http_proxy for header
[2024/05/31 15:15:30] [debug] [output:es:es.0] HTTP Status=200 URI=/_bulk
[2024/05/31 15:15:30] [debug] [output:es:es.0] Elasticsearch response
{"errors":false,"took":8,"items":[{"create":{"_index":"namespace-prod-2024.05.31","_id":"qRLKzo8BRVHJWXgMhrMO","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":24,"_primary_term":1,"status":201}}]}
[2024/05/31 15:15:30] [debug] [task] destroy task=0x75e1e20 (task_id=0)
[2024/05/31 15:15:30] [debug] [upstream] KA connection #52 to 127.0.0.1:9200 is now available
[2024/05/31 15:15:30] [debug] [out flush] cb_destroy coro_id=2
[2024/05/31 15:15:30] [ info] [engine] service has stopped (0 pending tasks)
[2024/05/31 15:15:30] [ info] [input] pausing dummy.0
[2024/05/31 15:15:30] [ info] [output:es:es.0] thread worker #0 stopping...
[2024/05/31 15:15:30] [ info] [output:es:es.0] thread worker #0 stopped
[2024/05/31 15:15:30] [ info] [output:es:es.0] thread worker #1 stopping...
[2024/05/31 15:15:30] [ info] [output:es:es.0] thread worker #1 stopped
==872248== 
==872248== HEAP SUMMARY:
==872248==     in use at exit: 0 bytes in 0 blocks
==872248==   total heap usage: 17,284 allocs, 17,284 frees, 4,349,411 bytes allocated
==872248== 
==872248== All heap blocks were freed -- no leaks are possible
==872248== 
==872248== For lists of detected and suppressed errors, rerun with: -s
==872248== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

es: Logstash_Prefix_Key with static prefix
1 participant