Skip to content

Commit

Permalink
Implemented backoff for backend communication (#999)
Browse files Browse the repository at this point in the history
* Implemented backoff for backend communication

* Added unit tests for backend communication backoff

* Added component test

* Added diagnostics to production code part

* Added comments to component test

* Clarified log message

* Removed isInFailedMode
  • Loading branch information
SergeyKleyman authored Jun 22, 2023
1 parent f8c1d58 commit f1c0dac
Show file tree
Hide file tree
Showing 33 changed files with 1,072 additions and 127 deletions.
7 changes: 1 addition & 6 deletions src/ElasticApm/Impl/Clock.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ final class Clock implements ClockInterface

public function __construct(LoggerFactory $loggerFactory)
{
$this->logger = $loggerFactory->loggerForClass(
LogCategory::INFRASTRUCTURE,
__NAMESPACE__,
__CLASS__,
__FILE__
)->addContext('this', $this);
$this->logger = $loggerFactory->loggerForClass(LogCategory::INFRASTRUCTURE, __NAMESPACE__, __CLASS__, __FILE__)->addContext('this', $this);

$this->hasMonotonicTimeSource = function_exists('hrtime');
}
Expand Down
3 changes: 1 addition & 2 deletions src/ElasticApm/Impl/ExecutionSegment.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ protected function __construct(
]
);
}
$this->diffStartTimeWithSystemClockOnBeginInMicroseconds
= TimeUtil::calcDurationInMicrosecondsClampNegativeToZero($this->timestamp, $systemClockNow);
$this->diffStartTimeWithSystemClockOnBeginInMicroseconds = TimeUtil::calcDurationInMicrosecondsClampNegativeToZero($this->timestamp, $systemClockNow);
$this->monotonicBeginTime = $monotonicClockNow;
$this->traceId = $traceId;
$this->setName($name);
Expand Down
25 changes: 20 additions & 5 deletions src/ext/backend_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "util.h"
#include "util_for_PHP.h"
#include "basic_macros.h"
#include "backend_comm_backoff.h"

#define ELASTIC_APM_CURRENT_LOG_CATEGORY ELASTIC_APM_LOG_CATEGORY_BACKEND_COMM

Expand Down Expand Up @@ -190,9 +191,10 @@ struct ConnectionData
{
CURL* curlHandle;
struct curl_slist* requestHeaders;
BackendCommBackoff backoff;
};
typedef struct ConnectionData ConnectionData;
ConnectionData g_connectionData = { .curlHandle = NULL, .requestHeaders = NULL };
ConnectionData g_connectionData = { .curlHandle = NULL, .requestHeaders = NULL, .backoff = ELASTIC_APM_DEFAULT_BACKEND_COMM_BACKOFF };

void cleanupConnectionData( ConnectionData* connectionData )
{
Expand Down Expand Up @@ -465,11 +467,16 @@ ResultCode syncSendEventsToApmServerWithConn( const ConfigSnapshot* config, Conn
ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE();
}

long responseCode;
long responseCode = 0;
curl_easy_getinfo( connectionData->curlHandle, CURLINFO_RESPONSE_CODE, &responseCode );
ELASTIC_APM_LOG_DEBUG( "Sent events to APM Server. Response HTTP code: %ld. URL: `%s'.", responseCode, url );

resultCode = resultSuccess;
/**
* If the HTTP response status code isn’t 2xx or if a request is prematurely closed (either on the TCP or HTTP level) the request MUST be considered failed.
*
* @see https://github.com/elastic/apm/blob/d8cb5607dbfffea819ab5efc9b0743044772fb23/specs/agents/transport.md#transport-errors
*/
bool isFailed = ( responseCode / 100 ) != 2;
ELASTIC_APM_LOG_WITH_LEVEL( isFailed ? logLevel_error : logLevel_debug, "Sent events to APM Server. Response HTTP code: %ld. URL: `%s'.", responseCode, url );
resultCode = isFailed ? resultFailure : resultSuccess;
finally:
ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT();
return resultCode;
Expand Down Expand Up @@ -505,19 +512,27 @@ ResultCode syncSendEventsToApmServer( const ConfigSnapshot* config, StringView u
ELASTIC_APM_SET_RESULT_CODE_TO_SUCCESS_AND_GOTO_FINALLY();
}

if ( backendCommBackoff_shouldWait( &connectionData->backoff ) )
{
ELASTIC_APM_LOG_DEBUG( "Backoff wait time has not elapsed yet - discarding events instead of sending" );
ELASTIC_APM_SET_RESULT_CODE_TO_SUCCESS_AND_GOTO_FINALLY();
}

if ( connectionData->curlHandle == NULL )
{
ELASTIC_APM_CALL_IF_FAILED_GOTO( initConnectionData( config, connectionData, userAgentHttpHeader ) );
}

ELASTIC_APM_CALL_IF_FAILED_GOTO( syncSendEventsToApmServerWithConn( config, connectionData, serializedEvents ) );
backendCommBackoff_onSuccess( &connectionData->backoff );

resultCode = resultSuccess;
finally:
ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT();
return resultCode;

failure:
backendCommBackoff_onError( &connectionData->backoff );
cleanupConnectionData( connectionData );
goto finally;
}
Expand Down
140 changes: 140 additions & 0 deletions src/ext/backend_comm_backoff.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "backend_comm_backoff.h"
#include <stdlib.h>
#include <math.h>
#include "basic_macros.h"
#include "log.h"
#include "time_util.h"

#define ELASTIC_APM_CURRENT_LOG_CATEGORY ELASTIC_APM_LOG_CATEGORY_BACKEND_COMM

/**
* Algorithm is based on Elastic APM agent spec's "Transport errors" section
*
* @see https://github.com/elastic/apm/blob/d8cb5607dbfffea819ab5efc9b0743044772fb23/specs/agents/transport.md#transport-errors
*/

void backendCommBackoff_onSuccess( BackendCommBackoff* thisObj )
{
ELASTIC_APM_ASSERT_VALID_PTR( thisObj );

thisObj->errorCount = 0;
thisObj->waitEndTime = (TimeSpec){ 0 };
}

bool backendCommBackoff_getCurrentTime( BackendCommBackoff* thisObj, /* out */ TimeSpec* currentTime )
{
ResultCode resultCode;
ELASTIC_APM_CALL_IF_FAILED_GOTO( getClockTimeSpec( /* isRealTime */ false, /* out */ currentTime ) );

resultCode = resultSuccess;
finally:
ELASTIC_APM_LOG_DEBUG_RESULT_CODE_FUNCTION_EXIT();
return resultCode == resultSuccess;

failure:
ELASTIC_APM_LOG_ERROR( "Failed to get current time" );
goto finally;
}

void backendCommBackoff_onError( BackendCommBackoff* thisObj )
{
ELASTIC_APM_ASSERT_VALID_PTR( thisObj );

ELASTIC_APM_LOG_DEBUG_FUNCTION_ENTRY();

/**
* The grace period should be calculated in seconds using the algorithm min(reconnectCount++, 6) ** 2 ± 10%
*
* @see https://github.com/elastic/apm/blob/d8cb5607dbfffea819ab5efc9b0743044772fb23/specs/agents/transport.md#transport-errors
*/
enum { maxSequentialErrorsCount = 7 };

if ( thisObj->errorCount < maxSequentialErrorsCount )
{
++thisObj->errorCount;
}

if ( ! backendCommBackoff_getCurrentTime( thisObj, /* out */ &thisObj->waitEndTime ) )
{
// If we cannot get current time we just reset the state to that of no errors
backendCommBackoff_onSuccess( thisObj );
return;
}
addDelayToAbsTimeSpec( /* in, out */ &thisObj->waitEndTime, /* delayInNanoseconds */ (long)backendCommBackoff_getTimeToWaitInSeconds( thisObj ) * ELASTIC_APM_NUMBER_OF_NANOSECONDS_IN_SECOND );
}

int backendCommBackoff_convertRandomUIntToJitter( UInt randomVal, UInt jitterHalfRange )
{
double diff = randomVal - ( RAND_MAX / 2.0 );
return ( diff >= 0 ? 1 : -1 ) * ( (int) floor( ( jitterHalfRange * fabs( diff ) ) / ( RAND_MAX / 2.0 ) ) );
}

UInt backendCommBackoff_getTimeToWaitInSeconds( const BackendCommBackoff* thisObj )
{
ELASTIC_APM_ASSERT_VALID_PTR( thisObj );

if ( thisObj->errorCount == 0 )
{
return 0;
}

UInt reconnectCount = ( thisObj->errorCount - 1);
double timeToWaitWithoutJitter = pow( reconnectCount, 2 );
double jitterHalfRange = timeToWaitWithoutJitter * 0.1;
UInt jitter = jitterHalfRange < 1 ? 0 : backendCommBackoff_convertRandomUIntToJitter( thisObj->generateRandomUInt( thisObj->generateRandomUIntCtx ), (UInt) floor( jitterHalfRange ) );

return (int)( round( timeToWaitWithoutJitter ) ) + jitter;
}

#pragma clang diagnostic push
#pragma ide diagnostic ignored "UnusedParameter"
UInt backendCommBackoff_defaultGenerateRandomUInt( void* ctx )
#pragma clang diagnostic pop
{
return (UInt) rand(); // NOLINT(cert-msc50-cpp)
}

bool backendCommBackoff_shouldWait( BackendCommBackoff* thisObj )
{
if ( thisObj->errorCount == 0 )
{
return false;
}

TimeSpec currentTime;
if ( ! backendCommBackoff_getCurrentTime( thisObj, /* out */ &currentTime ) )
{
// If we cannot get current time we just reset the state to that of no errors
backendCommBackoff_onSuccess( thisObj );
return false;
}

if ( compareAbsTimeSpecs( &thisObj->waitEndTime, &currentTime ) <= 0 )
{
return false;
}

char txtOutStreamBuf[ ELASTIC_APM_TEXT_OUTPUT_STREAM_ON_STACK_BUFFER_SIZE ];
TextOutputStream txtOutStream = ELASTIC_APM_TEXT_OUTPUT_STREAM_FROM_STATIC_BUFFER( txtOutStreamBuf );
ELASTIC_APM_LOG_TRACE( "Left to wait: %s, errorCount: %u", streamTimeSpecDiff( &currentTime, &thisObj->waitEndTime, &txtOutStream ), thisObj->errorCount );
return true;
}
52 changes: 52 additions & 0 deletions src/ext/backend_comm_backoff.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "basic_types.h"
#include "time_util.h"

typedef UInt (* GenerateRandomUInt )( void* ctx );

struct BackendCommBackoff
{
GenerateRandomUInt generateRandomUInt;
void* generateRandomUIntCtx;
UInt errorCount;
TimeSpec waitEndTime;
};
typedef struct BackendCommBackoff BackendCommBackoff;

void backendCommBackoff_onSuccess( BackendCommBackoff* thisObj );
void backendCommBackoff_onError( BackendCommBackoff* thisObj );
bool backendCommBackoff_shouldWait( BackendCommBackoff* thisObj );

UInt backendCommBackoff_getTimeToWaitInSeconds( const BackendCommBackoff* thisObj );
int backendCommBackoff_convertRandomUIntToJitter( UInt randomVal, UInt jitterHalfRange );
UInt backendCommBackoff_defaultGenerateRandomUInt( void* ctx );

#define ELASTIC_APM_DEFAULT_BACKEND_COMM_BACKOFF \
((BackendCommBackoff) \
{ \
.generateRandomUInt = &backendCommBackoff_defaultGenerateRandomUInt, \
.generateRandomUIntCtx = NULL, \
.errorCount = 0, \
.waitEndTime = { 0 } \
}) \
/**/
1 change: 1 addition & 0 deletions src/ext/config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ if test "$PHP_ELASTIC_APM" != "no"; then
AST_debug.c \
AST_instrumentation.c \
backend_comm.c \
backend_comm_backoff.c \
ConfigManager.c \
elastic_apm.c \
elastic_apm_API.c \
Expand Down
1 change: 1 addition & 0 deletions src/ext/config.w32
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ if (PHP_ELASTIC_APM != 'no') {
"AST_debug.c" + " " +
"AST_instrumentation.c" + " " +
"backend_comm.c" + " " +
"backend_comm_backoff.c" + " " +
"ConfigManager.c" + " " +
"elastic_apm.c" + " " +
"elastic_apm_API.c" + " " +
Expand Down
2 changes: 1 addition & 1 deletion src/ext/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ static void openAndAppendToFile( Logger* logger, String text )
{
size_t textLen = strlen( text );

// TODO: Sergey Kleyman: Uncomment: Fix lower level system calls - "open" and "write" to get stronger guarantee
// OLD TODO: Uncomment: Fix lower level system calls - "open" and "write" to get stronger guarantee
//#ifdef PHP_WIN32
FILE* file = fopen( logger->config.file, "a" );
if ( file == NULL )
Expand Down
34 changes: 32 additions & 2 deletions src/ext/time_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ Int64 durationToMilliseconds( Duration duration )
#pragma clang diagnostic push
#pragma ide diagnostic ignored "ConstantFunctionResult"
#endif
ResultCode getCurrentAbsTimeSpec( TimeSpec* currentAbsTimeSpec )
ResultCode getClockTimeSpec( bool isRealTime, /* out */ TimeSpec* currentAbsTimeSpec )
{
ResultCode resultCode;

#ifdef PHP_WIN32
ELASTIC_APM_SET_RESULT_CODE_AND_GOTO_FAILURE();
#else // #ifdef PHP_WIN32
int clock_gettime_retVal = clock_gettime( CLOCK_REALTIME, currentAbsTimeSpec );
int clock_gettime_retVal = clock_gettime( isRealTime ? CLOCK_REALTIME : CLOCK_MONOTONIC, currentAbsTimeSpec );
if ( clock_gettime_retVal != 0 )
{
int clock_gettime_errno = errno;
Expand All @@ -110,6 +110,11 @@ ResultCode getCurrentAbsTimeSpec( TimeSpec* currentAbsTimeSpec )
#pragma clang diagnostic pop
#endif

ResultCode getCurrentAbsTimeSpec( /* out */ TimeSpec* currentAbsTimeSpec )
{
return getClockTimeSpec( /* isRealTime */ true, /* out */ currentAbsTimeSpec );
}

void addDelayToAbsTimeSpec( /* in, out */ TimeSpec* absTimeSpec, long delayInNanoseconds )
{
ELASTIC_APM_ASSERT_VALID_PTR( absTimeSpec );
Expand Down Expand Up @@ -231,6 +236,31 @@ String streamUtcTimeSpecAsLocal( const TimeSpec* utcTimeSpec, TextOutputStream*
return streamUtcTimeValAsLocal( &utcTimeVal, txtOutStream );
}

String streamTimeSpecDiff( const TimeSpec* fromTimeSpec, const TimeSpec* toTimeSpec, TextOutputStream* txtOutStream )
{
ELASTIC_APM_ASSERT_VALID_PTR( fromTimeSpec );
ELASTIC_APM_ASSERT_VALID_PTR( toTimeSpec );
ELASTIC_APM_ASSERT_VALID_PTR( txtOutStream );

bool isDiffNegative = compareAbsTimeSpecs( fromTimeSpec, toTimeSpec ) > 0;
const TimeSpec* from = isDiffNegative ? toTimeSpec : fromTimeSpec;
const TimeSpec* to = isDiffNegative ? fromTimeSpec : toTimeSpec;
UInt64 diffSecondsPart = to->tv_sec - from->tv_sec;
Int64 diffNanosecondsPart = to->tv_nsec - from->tv_nsec;
if ( diffNanosecondsPart < 0 )
{
--diffSecondsPart;
diffNanosecondsPart += ELASTIC_APM_NUMBER_OF_NANOSECONDS_IN_SECOND;
}

if ( diffNanosecondsPart == 0 )
{
return streamPrintf( txtOutStream, "%"PRIu64"s", diffSecondsPart );
}

streamPrintf( txtOutStream, "%s%"PRIu64"s %"PRId64"ns", isDiffNegative ? "-" : "", diffSecondsPart, diffNanosecondsPart );
}

int compareAbsTimeSpecs( const TimeSpec* a, const TimeSpec* b )
{
#define ELASTIC_APM_NUMCMP( a, b ) ( (a) < (b) ? -1 : ( (a) > (b) ? 1 : 0 ) )
Expand Down
Loading

0 comments on commit f1c0dac

Please sign in to comment.