Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/lsminterval] Handle overflow for metrics aggregations #141

Open
lahsivjar opened this issue Sep 25, 2024 · 16 comments
Open

[processor/lsminterval] Handle overflow for metrics aggregations #141

lahsivjar opened this issue Sep 25, 2024 · 16 comments
Assignees
Labels

Comments

@lahsivjar
Copy link
Contributor

Aggregations could be boundless with huge cardinality OR with buggy instrumentation. To protect against these, the aggregated metrics should have limits and overflow to specific buckets after those limits are breached. This would be similar to what is implemented in https://github.com/elastic/apm-aggregation

@lahsivjar lahsivjar added the enhancement New feature or request label Sep 25, 2024
@lahsivjar
Copy link
Contributor Author

Current overflow handling in APM Server using apm-aggregation

APM aggregation implements overflow handling logic based on a set of factors, each having their own threshold. These factors are configured in APM Server based on the available resources or hard-coded if the use-case is known. By default, APM Server sets all the limits based on the memory available using a linear scaling based on the formula mem_available * constant_factor. For example: for services, the constant factor is 1000, i.e. 8GB APM-Server will have a limit on the maximum number of services as 8000 (ref).

Maximum number of services

The number of unique services for the purpose of aggregated metrics is defined as the cardinality of the following set of labels (identified as service aggregation key):

  • timestamp truncated based on the interval
  • service.name
  • service.environment
  • service.language.name
  • agent.name
  • Global labels

When the max service limit for an aggregation interval is reached then a new service_summary metric is produced with the service name set to _other and the timestamp set to the processing timestamp - service summary metrics in APM server doesn't record any numeric value. In addition, another metric with the name of service_summary.aggregation.overflow_count is produced that records service aggregation keys overflowed for the given interval.

This overflow service bucket acts like a catch-all for all overflow aggregated metrics i.e. in addition to the service count, the overflow services also record service transaction, transaction, and span metrics in their corresponding data types (histograms, aggregate_metric_double, or counters) with the following overflow identifiers

  • Overflow service transaction is identified by transaction.type set to _other with service.name set to _other
  • Overflow transaction is identified by transaction.name set to _other with service.name set to _other
  • Overflow span is identified by service.target.name set to _other with service.name set to _other

Maximum number of service transactions

Service transactions are the number of unique transaction types within a service (this is based on the current key definition and can change in future). The number of unique service transactions for the purpose of aggregated metrics is defined as the cardinality of the following set of labels (identified as service transaction aggregation key):

  • transaction.type

When the maximum service transaction limit within a service is reached then a new metric is created for the service that reached the limit with transaction.type set to _other. This new service transaction will now capture ALL the NEW service transactions for the current interval as overflow within the required metric types for a normal service transaction metric.

Maximum number of transactions

Transactions are the number of unique transaction keys within the service where a transaction key is defined with the following fields:

  • trace.root (different than transaction root
  • container.id
  • kubernetes.pod.name
  • service.version
  • service.node.name
  • service.runtime.name
  • service.runtime.version
  • service.language.version
  • host.hostname
  • host.name
  • host.os.platform
  • event.outcome
  • transaction.name
  • transaction.type
  • transaction.result
  • faas.coldstart
  • faas.id
  • faas.name
  • faas.version
  • faas.trigger.type
  • cloud.provider
  • cloud.region
  • cloud.availability_zone
  • cloud.service.name
  • cloud.account.id
  • cloud.account.name
  • cloud.machine_type
  • cloud.project.id
  • cloud.project.name

When the maximum transaction limit within a service is reached then a new metric is created for the service that reached the limit with transaction.name set to _other. This new transaction will now capture ALL the NEW transactions for the current interval as overflow within the required metric types for a normal transaction metric.

Maximum number of spans

Spans are the number of unique span keys within a service where the span key is defined with the following fields:

  • span.name
  • event.outcome
  • service.target.type
  • service.target.name
  • service.destination_service.resource

When the maximum span limit within a service is reached then a new metric is created for the service that reached the limit with service.target.name set to _other. This new span will now capture ALL the NEW spans for the current interval as overflow within the required metric types for a normal span metric.

@lahsivjar
Copy link
Contributor Author

lahsivjar commented Oct 24, 2024

Proposal for overflow handling in LSM interval processor

Signal to metrics connector and LSM interval processor together provide the OTel native way to do aggregations as required by APM. Signal to metrics connector has the role of extracting the aggregated metrics from the incoming signals and LSM interval processor simply aggregates the metrics for the specified aggregation intervals. A sample configuration used for both components to produce APM aggregated metrics can be perused here.

The OTel implementation differs from the APM implementation due to the fact that the aggregated metrics cannot be first-class entities. Instead, the aggregated metrics have to be defined as any other metrics and processed by the components. In order to formalize handling overflows with the OTel data model, we have to identify the problem that the overflow handling in APM solves.

Problem

The main purpose of aggregating metrics is to reduce storage and query costs by doing aggregations during the ingestion process. The aggregation shines when the cardinality of the attributes that are being aggregated is bounded. High cardinality attributes blow up the memory requirements, increasing the resource requirements for the aggregations during ingestion. Unbounded cardinality makes things worse and aggregations with bigger intervals are almost made impractical. Unbounded/high cardinality is usually due to bugs in instrumentation or simply bad instrumentation.

Assumptions

  1. Aggregations are performed with a specific set of attributes or the attributes are known and defined.
  2. Cardinality for attributes over which aggregations are performed are expected to be bounded within the defined aggregation intervals.
  3. The component (in our case lsminterval processor) is given enough memory to handle aggregations with the expected cardinality.

Proposal

Solving the problem of unbounded/high cardinality attributes requires identifying that there is an issue and reporting it to the owners of the instrumentation. To this end, our OTel pipeline needs a way to identify the cardinality issues during aggregation.

Keeping the above assumptions in mind, the proposal will act like a cardinality limiter, over defined set of attributes, with overflow buckets when the cardinality exceeds the defined limits.

(The names of the configuration might be a bit confusing, will update with better names as we evolve this)

limits:
  - action: oneOf{"drop", "overflow"} # if drop is configured the metrics exceeding the limit are simply dropped
    resource_attributes: # A list of resource attributes over which to apply the cardinality limits
      - key: <string> # If the resource is not present in the input then an empty value will be used for cardinality calculation
    scope_attributes: [] # A list of scope attributes over which to apply the cardinality limit, empty means no limits
    datapoint_attributes: [] # A list of datapoint attributes over which to apply the cardinality limit
    max_size: <int> # the max cardinality for the above set of attributes
    # Below configuration are only used if action is `overflow`
    overflow: # Defines how overflow buckets/resource metrics will be constructed
      resource_attributes: # A list of static resource attributes to add to the overflow buckets
        - key: <string>
          value: <any>
      scope_attributes: []
      datapoint_attributes: []

Few points to note:

  1. All the defined limits will be applied in the order they are defined -- this means that overflows can be produced from overflows.
  2. In case of an overflow, all the attributes that are defined in the resource_attributes, scope_attributes, or datapoint_attributes will be stripped from the incoming data. The resulting metric will then be enriched with the attributes defined in overflow section and added to the existing or new datapoint.
  3. Point 2 means that we could still have a high cardinality on the overflow bucket as there could be more attributes than what we define in limits. However, assuming we know the attributes that are getting aggregated, we can always limit the overflow by producing more overflows with other attributes.
  4. In addition to the above configuration, we could also introduce a global limiter on max number of resource metrics, scope metrics or datapoints as a catch-all protection -- I am not yet sure about how this will interact with the above defined limits.

@axw
Copy link
Member

axw commented Oct 25, 2024

Thanks for the writeup @lahsivjar. Seems reasonable overall. It feels a bit awkward that the definition of metrics & overflow logic are defined in two different places, but I'm not sure if there's a better option.

@felixbarny
Copy link
Member

What would help me to understand this a bit better is an example configuration of both the signaltometricsconnector plus the lsmintervalprocessor.

One aspect of the APM aggregations that I don't see mentioned, and I'm not sure is possible in the proposal, is the protection against a single service consuming most of the cardinality limit.

Aside from that, I'm wondering if it could make the configuration a bit more concise to combine the overflow section with the top-level *_attributes sections. For example

    resource_attributes:
      - key: <string>
        overflow_value: <any> # optional

@lahsivjar
Copy link
Contributor Author

lahsivjar commented Nov 12, 2024

What would help me to understand this a bit better is an example configuration of both the signaltometricsconnector plus the lsmintervalprocessor.

Below I have put a sample configuration based on the proposal:

signaltometrics:
    spans:
      - name: transaction.duration.histogram
        description: APM service transaction aggregated metrics as histogram
        include_resource_attributes:
          - key: service.name
          - key: deployment.environment
          - key: telemetry.sdk.language
          - key: agent.name
        attributes:
          - key: transaction.root
          - key: transaction.type
          - key: metricset.name
            default_value: service_transaction
          - key: elasticsearch.mapping.hints
            default_value: [_doc_count]
        unit: us
        exponential_histogram:
          value: Microseconds(end_time - start_time)

lsminterval:
    intervals:
      - duration: 60m
        statements:
          - set(resource.attributes["metricset.interval"], "60m")
          - set(attributes["data_stream.dataset"], Concat([attributes["metricset.name"], "60m"], "."))
          - set(attributes["processor.event"], "metric")
    limits:
      - action: overflow
        resource_attributes:
          - key: service.name
          - key: deployment.environment
          - key: telemetry.sdk.language
          - key: agent.name
        max_size: 100 # Only 100 services can exist, more than that will overflow
        overflow: # Defines what attributes the service overflow will have
          # we will also need single-writer handling as we do in signaltometrics
          resource_attributes:
            - key: service.name
              value: _other
      - action: overflow
        resource_attributes:
          - key: service.name
          - key: deployment.environment
          - key: telemetry.sdk.language
          - key: agent.name
        datapoint_attributes:
          - key: transaction.root
          - key: transaction.type
          - key: metricset.name
        max_size: 1000
        overflow: # Defines what attributes the overflow aggregation will have
          # we will also need single-writer handling as we do in signaltometrics
          datapoint_attributes:
            - key: transaction.type
              value: _other

One aspect of the APM aggregations that I don't see mentioned, and I'm not sure is possible in the proposal, is the protection against a single service consuming most of the cardinality limit.

In the proposal, the cardinality limits are applied to a defined set of attributes, and we don't have global limits yet. For apm-aggregation, we have the following limits (ref):

(✅ means we cover this in the above proposal and ⛔ means we don't cover this yet)

  • MaxServices ✅ : maximum number of unique services.
  • MaxSpanGroups ⛔ : maximum limit on total span groups.
  • MaxSpanGroupsPerService ✅ : maximum limit on the number of span groups for a service.
  • MaxTransactionGroups ⛔ : maximum limit on the total transaction groups.
  • MaxTransactionGroupsPerService ✅ : maximum number of transaction groups per service.
  • MaxServiceTransactionGroups ⛔ : maximum limit on the total service transaction groups.
  • MaxServiceTransactionGroupsPerService ✅ : maximum number of service transaction groups per service.

We could apply a global configuration on the limits to take care of this (I proposed this here in my notes too):

In addition to the above configuration, we could also introduce a global limiter on max number of resource metrics, scope metrics or datapoints as a catch-all protection -- I am not yet sure about how this will interact with the above defined limits.

I am working on fleshing this out a bit more as well as thinking about some other mechanism to reach our end goal. I have an idea to make this work with the current configuration but not sure on the details - will follow up on this. @felixbarny WDYT? Does it make sense so far?

@felixbarny
Copy link
Member

I think what's missing is a way to enforce isolation/partitioning of services. I'd like to see this as a first class citizen of the lsmintervalprocessor.

The current proposal has a noisy neighbor-like problem where services with a high cardinality for deployment.environment, transaction.type, or (not in this example) transaction.name can starve other services in the sense that all metrics for well-behaving services may land right in the overflow bucket because a single rogue service has claimed all resources. This is especially an issue when running the lsmintervalprocessor in a multi-tenant environment. But even if it's just a single tenant with a lot of services, each service should have a guaranteed number of aggregation buckets available that can't be taken away from other services.

I think another big benefit of creating isolated partitions for each service is that the merging process of the lsm can work on smaller units, so that the peak memory requirement can be a lot lower. IIUC, the main source of memory usage is loading all metrics stored in segment files into main memory during the companion process.

What I'm thinking is that you can define a partitioning key, for example ['project.id', 'service.name'], and define a limit for that. When routing events to different instances of collectors running the lsmintervalprocessor, we need to ensure that we don't get over that limit. The limit defines how many services a single instance can safely aggregate while guaranteeing to never run out of memory, and should be configured based on the memory allocated to the collector instance. We can scale aggregation instances depending on the number of services and depending on how many services can be aggregated by a single instance, so there's not really a limit in terms of number of services we can support. (That's unless we want to limit the number of services a single tenant can have.)

Within each partition (service), we can then limit the cardinality of the different metrics and create overflow buckets. But one partition can never take away resources (aggregation buckets) from another partition. This also helps to ensure we never run out of memory as it creates an upper bound for the memory required by a single partition. Based on that, we can calculate how many partitions a single instance can handle per GB of allocated memory.

@lahsivjar
Copy link
Contributor Author

Discussed this with @felixbarny IRL:

But even if it's just a single tenant with a lot of services, each service should have a guaranteed number of aggregation buckets available that can't be taken away from other services.

The current proposal doesn't apply any global limit, so, each service (or a set of attributes) will get its fair share of aggregation buckets. This would mean that the global limit would be implicitly defined based on other limits. Another point to note as per the current proposal is that the limits should be applied in the order they are defined, so, the order is important.

In addition to this, we also discussed some other points:

Not limiting the number of services for the aggregation

The basic idea here is to have no overflows for the number of services i.e. each service can have its aggregation bucket but bounding the aggregation buckets within the service. With the current design of the processor, we would end up having unbounded memory usage if we adopt this, however, we can make it work with the following improvements to the component:

  1. Introduce customized data partitioning based on configurable resource attributes (for example service.name).
  2. Configure the LSM interval processor to use a disk for EDOT with a sufficiently high size so that services produced within an hour can be aggregated.

Since the memory usage of the LSM interval processor is proportional to the max memory requirement for a partition, we will have a bounded memory requirement if the service is used as a partitioning key and overflows are properly defined within a service. This assumes that merges and harvest performed by the LSM aggregator are using a concurrency level of 1 (i.e. only one merge/harvest operation at a time).

Scaling collectors performing aggregations

Horizontally scaling collectors configured with LSM interval is challenging under circumstances where resource usage cannot always be reduced proportionally to the number of collector instances performing aggregations. For example: if data for a specific service is being sent to all replicas of the collector configured with LSM interval processor and we are doing aggregations using histograms then the same memory would be required for each service across all replicas - though, each replica would be handling lesser throughput of data.

This could be addressed by consistent hash-based routing to collector replicas OR consistent hash-based partition assignment in case of kafka receivers. Alternatively, if we used the service as a partitioning key and overflows are bounded within a service then we might be able to have reasonably low + bounded memory requirements and not care much about this point.

Utilizing memory limiter processor instead of overflows

The idea here is to piggyback on the memory limiter processor in a way that overflows will be redundant. Since memory limiter processor can limit the memory used by the collector, we can introduce partitioning of the incoming data in a way that the limiter can be configured to push back to the receivers for partitions that are using too much memory. This would also allow the limits to be dynamically set based on the number of partitions identified. A major drawback here would be how pushback to receivers could be performed based on customizable partitions (defined by sets of attributes).

The idea will falter for async cases where pushing back to receivers could lead to data drop, however, if receivers are backed by persistence then we can address this.

@felixbarny
Copy link
Member

felixbarny commented Nov 18, 2024

Zooming out a bit, what we're trying to achieve is being able to handle scale within a service (high throughput and/or high cardinality metrics) as well as scale the number of services we can aggregate in a multi-tenant environment.

I think these are the most important aspects to keep in mind for that:

  • Guaranteeing that the processor doesn't run out of memory.
  • Avoid runaway costs per service - services may not adhere to best practices, like ensuring the cardinality of transaction names is limited. We need to gracefully handle misbehaving services by limiting the amount of resources we allocate to a single service, while retaining a good user experience.
  • Resource isolation - a service can't take away resources (like "aggregation buckets") from other services.
  • Freshness - ensuring that the aggregation of a service doesn't fall behind in a way where it's unlikely to be able to catch up again. The metric cardinality of a service is unlikely to get lower over time, while throughput will have bursty and seasonal behaviour.
  • Ability to scale out by deploying more instances, without having to impose limits on the number of supported services. Note that the throughput of a service doesn't necessarily correlate with the cardinality of the metrics generated by it.
  • Allowing for a high resource utilization, even if there are services with very little cardinality or throughput. This can come as a later enhancement and needs to be reconciled with guaranteeing to not run out of memory.

Based on that, I'm not sure if the memory limiter approach that applies back pressure for high cardinality services instead of creating overflow buckets fulfills these requirements. Once a service produces high-cardinality metrics and we're pushing back, it's unlikely that it'll produce metrics with less cardinality in the future. Therefore, we may never be able to catch up with aggregating metrics for the current interval. We also don't necessarily want to scale out to avoid misbehaving services from consuming too many resources.

@lahsivjar
Copy link
Contributor Author

Guaranteeing that the processor doesn't run out of memory.

I think "guarantee" here is too strict and bit out of scope of the component. We would probably need to dynamically impose limits based on available memory to achieve this which I am not sure is a good idea.

Avoid runaway costs per service - services may not adhere to best practices, like ensuring the cardinality of transaction names is limited. We need to gracefully handle misbehaving services by limiting the amount of resources we allocate to a single service, while retaining a good user experience.
Resource isolation - a service can't take away resources (like "aggregation buckets") from other services.

Agreed!

Freshness - ensuring that the aggregation of a service doesn't fall behind in a way where it's unlikely to be able to catch up again.

Hmm, it is possible that this happens if pebble throughput cannot match the input throughput (this would be a factor of disk IO and other parameters related to input) but I am not sure if this is a problem we should be solving with the component. It is more of a scaling problem and anyone trying to push component to this limit should be ready to scale horizontally IMO.

Ability to scale out by deploying more instances, without having to impose limits on the number of supported services. Note that the throughput of a service doesn't necessarily correlate with the cardinality of the metrics generated by it.

Completely agree with doing away from any limits outside of the service boundary but I don't think we need to introduce such service as a first class citizen for the limit. It is our use case where we don't want limits outside service boundary but I think it should be possible for the component to impose limits in general.

To the similar point, the ability to horizontally scale is outside the scope of the component IMO. I say this because naively horizontally scaling the component might not always distribute the resource usage (especially memory) and will need to be handled as per the workloads - for example, using kafka in between with hash based partitioning or using approaches to distribute services between available instances.

Based on that, I'm not sure if the memory limiter approach that applies back pressure for high cardinality services instead of creating overflow buckets fulfills these requirements. Once a service produces high-cardinality metrics and we're pushing back, it's unlikely that it'll produce metrics with less cardinality in the future. Therefore, we may never be able to catch up with aggregating metrics for the current interval. We also don't necessarily want to scale out to avoid misbehaving services from consuming too many resources.

+1 on this, I think we can drop the memory limiter approach.

Updated Proposal

I still think we can continue with the proposal mentioned here as it allows us to define limits in a generic way. The limits can be defined to achieve the following needs:

  1. Only introduce limit for service boundary
  2. Noisy neighbour i.e. a bad service cannot take away from a good service

Let me know if the proposal makes sense so far or if we need to address any of the above (or any other) points in more detail.

Next steps

  • Confirm the bigger points of the proposal
  • Implement a PoC quickly to explore the options and finalize the finer implementation details
  • Do the final implementation

@axw
Copy link
Member

axw commented Nov 20, 2024

I think the proposal above sounds broadly sensible: have a limit on services per project, all other limits are per service. I'd be keen to see a PoC on this.

@felixbarny
Copy link
Member

Sorry, I wasn't very clear. In my last comment, I didn't want to imply that all of these aspects would need to be tackled just by the lsm interval processor. These are the end-to-end aspects that the whole system needs to have. Some of them may be in scope for the lsm processor, some may be a part of how we partition the data in the queue. But we should be clear about which aspects are handled where in a holistic way so that when we put the different pieces together, we we're not missing an important aspect. Also, it's totally fine to have an iterative approach to this as long as we're not painting ourselves into a corner. Think of it as some kind of acceptance criteria for the end-to-end solution where any implementation that satisfies these requirements is acceptable. The intention was to give you more freedom in choosing the right approach by laying out the requirements and constraints more explicitly.

I think the proposal looks good but I wanted to make sure we're also thinking about how aspects that aren't handled directly by the lsm interval processor fit in.

@felixbarny
Copy link
Member

have a limit on services per project

I was thinking that this would limit the number of services a single instance can aggregate, not limit the number of services a project can have. I'm not sure if we want to limit the number of per project due to

Ability to scale out by deploying more instances, without having to impose limits on the number of supported services.

@axw
Copy link
Member

axw commented Nov 20, 2024

have a limit on services per project

I was thinking that this would limit the number of services a single instance can aggregate, not limit the number of services a project can have. I'm not sure if we want to limit the number of per project due to

When I said "have a limit on services per project", I meant only in the context of an instance of the aggregator.

@lahsivjar
Copy link
Contributor Author

Quick update on this, I am working on a PoC implementation and it should be ready in a couple of days.

@lahsivjar
Copy link
Contributor Author

lahsivjar commented Nov 30, 2024

Apologies for the delay, I had some hiccups in the implementation related to complexity of the above model and the performance (especially the overhead in encoding the multiple limits that was proposed above). I made some simplifications in the overflow handling model:

  1. Instead of a list of limits, we now have one limit for each of resource metrics, scope metrics, and datapoints. The meaning of the limits is as follows:
    • Resource metrics limits: The goal of this limit is to contain the cardinality of the resource metrics. Once the max cardinality of the resource metrics is breached, a new overflow resource metrics is created. The metric has static attributes based on what is configured in the limits. Any scope metrics, metrics, datapoints that are added with a new resource metric after the limit is breached are put in the overflow bucket. Only one overflow bucket exists for resource metrics.
    • Scope metrics limits: The goal of this limit is to contain the cardinality of scope metrics within a resource metrics. Once the max cardinality of the scope metrics is breached, a new overflow scope metrics is created. Similar to resource metric, the scope metric has static attributes based on what is configured in the limits. Each resource metric (including the overflow resource metric) gets an overflow scope metric.
    • Datapoints limits: The goal of this limit is to contain the cardinality of datapoints within a scope metric. Once the max cardinality of the datapoints for a scope metric is breached, a new Sum metric with delta temporality is added to the target scope with metric name _other. This sum metric counts the cardinality of the overflow. Each scope metric gets an overflow metric.
  2. Limits apply to the overflow buckets too. For example: if the resource metric cardinality is breached causing a new overflow resource metric to be created then that overflow metric can also have an overflow in its scope metrics and so on.
  3. As an example, the model can limit the max number of services being aggregated with an upper bound on what each aggregated service can have. If in future we define partitioning (as described in this issue) then we can reduce the memory requirements to the max memory required for a service based on the defined limits.

Here is a draft PR for this model. Note that the PR is still WIP and requires some refactoring + optimizations before it is in a mergeable state (given we agree on the approach) but it is in a working state. I have also added some simple tests with different metric types to show how the overflow happens for datapoints.

@axw
Copy link
Member

axw commented Dec 1, 2024

@lahsivjar I think this approach sounds fine - will need to think more on it as I review the code.

If users need more fine-grained control over the limits, perhaps we could add support for conditions to the processor so it only processes certain metrics? Then users could create multiple instances of the processor for different sets of metrics, each with their own limits. I don't think you would be able to share limits across instances though, so not a perfect replacement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants