Skip to content

Commit

Permalink
[add] add network recover for producer
Browse files Browse the repository at this point in the history
  • Loading branch information
shabicheng committed Dec 27, 2017
1 parent ee29a33 commit a0e16fe
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 4 deletions.
9 changes: 9 additions & 0 deletions src/log_producer_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ extern log_producer_client * get_log_producer_client(log_producer * producer, co
return client;
}

void log_producer_client_network_recover(log_producer_client * client)
{
if (client == NULL)
{
return;
}
log_producer_manager * manager = ((producer_client_private *)client->private_data)->producer_manager;
manager->networkRecover = 1;
}

log_producer_result log_producer_client_add_log(log_producer_client * client, int32_t kv_count, ...)
{
Expand Down
8 changes: 8 additions & 0 deletions src/log_producer_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ extern log_producer * create_log_producer(log_producer_config * config, on_log_p
*/
extern log_producer * create_log_producer_by_config_file(const char * configFilePath, on_log_producer_send_done_function send_done_function);


/**
* destroy log producer
* @param producer
Expand All @@ -72,6 +73,13 @@ extern void destroy_log_producer(log_producer * producer);
*/
extern log_producer_client * get_log_producer_client(log_producer * producer, const char * config_name);


/**
* force send data when network recover
* @param client
*/
extern void log_producer_client_network_recover(log_producer_client * client);

/**
* add log to producer, this may return LOG_PRODUCER_DROP_ERROR if buffer is full.
* if you care about this log very much, retry when return LOG_PRODUCER_DROP_ERROR.
Expand Down
1 change: 1 addition & 0 deletions src/log_producer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ typedef struct _log_producer_manager
apr_pool_t * send_pool;
log_producer_config * producer_config;
volatile uint32_t shutdown;
volatile uint32_t networkRecover;
volatile apr_uint32_t totalBufferSize;
log_producer_sender * sender;
apr_byte_t priority;
Expand Down
12 changes: 8 additions & 4 deletions src/log_producer_sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ const char* LOGE_UNAUTHORIZED = "Unauthorized";
const char* LOGE_WRITE_QUOTA_EXCEED = "WriteQuotaExceed";
const char* LOGE_SHARD_WRITE_QUOTA_EXCEED = "ShardWriteQuotaExceed";

#define SEND_SLEEP_INTERVAL_MS 30
#define SEND_SLEEP_INTERVAL_MS 100
#define MAX_NETWORK_ERROR_SLEEP_MS 3600000
#define BASE_NETWORK_ERROR_SLEEP_MS 10000
#define BASE_NETWORK_ERROR_SLEEP_MS 1000
#define MAX_QUOTA_ERROR_SLEEP_MS 60000
#define BASE_QUOTA_ERROR_SLEEP_MS 30000
#define BASE_QUOTA_ERROR_SLEEP_MS 3000

#define DROP_FAIL_DATA_TIME_SECOND (3600 * 6)

Expand Down Expand Up @@ -80,7 +80,7 @@ void * log_producer_send_fun(apr_thread_t * thread, void * param)
for (i = 0; i < sleepMs; i += SEND_SLEEP_INTERVAL_MS)
{
apr_sleep(SEND_SLEEP_INTERVAL_MS * 1000);
if (producer_manager->shutdown)
if (producer_manager->shutdown || producer_manager->networkRecover)
{
break;
}
Expand All @@ -92,6 +92,10 @@ void * log_producer_send_fun(apr_thread_t * thread, void * param)
aos_info_log("send fail but shutdown signal received, force exit");
break;
}
if (producer_manager->networkRecover)
{
producer_manager->networkRecover = 0;
}

}while(1);

Expand Down

0 comments on commit a0e16fe

Please sign in to comment.