Skip to content

Commit

Permalink
SNOW-878098: Retry Strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Harry Xi authored Oct 26, 2023
2 parents d6e0609 + 1001bf0 commit 5f5cf96
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 47 deletions.
12 changes: 11 additions & 1 deletion include/snowflake/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,17 @@ extern "C" {
/**
* Login timeout in seconds
*/
#define SF_LOGIN_TIMEOUT 120
#define SF_LOGIN_TIMEOUT 300

/**
* network timeout other than login requests
*/
#define SF_NETWORK_TIMEOUT 120

/**
* max retry number for login reuests (login/authenticator/token)
*/
#define SF_LOGIN_MAX_RETRY 7

/**
* Default JWT timeout in seconds
Expand Down
2 changes: 1 addition & 1 deletion lib/chunk_downloader.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ sf_bool STDCALL download_chunk(char *url, SF_HEADER *headers,
non_json_resp, DEFAULT_SNOWFLAKE_REQUEST_TIMEOUT,
SF_BOOLEAN_TRUE, error, insecure_mode, 0,
0, 0, NULL, NULL, NULL, SF_BOOLEAN_FALSE,
proxy, no_proxy, SF_BOOLEAN_FALSE)) {
proxy, no_proxy, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE)) {
// Error set in perform function
goto cleanup;
}
Expand Down
16 changes: 12 additions & 4 deletions lib/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ SF_CONNECT *STDCALL snowflake_init() {
sf->token = NULL;
sf->master_token = NULL;
sf->login_timeout = SF_LOGIN_TIMEOUT;
sf->network_timeout = 0;
sf->network_timeout = SF_NETWORK_TIMEOUT;
sf->sequence_counter = 0;
_mutex_init(&sf->mutex_sequence_counter);
sf->request_id[0] = '\0';
Expand All @@ -695,7 +695,7 @@ SF_CONNECT *STDCALL snowflake_init() {
sf->directURL = NULL;
sf->direct_query_token = NULL;
sf->retry_on_curle_couldnt_connect_count = 0;
sf->retry_on_connect_count = 0;
sf->retry_on_connect_count = SF_LOGIN_MAX_RETRY;

sf->qcc_capacity = QCC_CAPACITY_DEF;
sf->qcc_disable = SF_BOOLEAN_FALSE;
Expand Down Expand Up @@ -1054,9 +1054,13 @@ SF_STATUS STDCALL snowflake_set_attribute(
break;
case SF_CON_LOGIN_TIMEOUT:
sf->login_timeout = value ? *((int64 *) value) : SF_LOGIN_TIMEOUT;
if (sf->login_timeout < SF_LOGIN_TIMEOUT)
{
sf->login_timeout = SF_LOGIN_TIMEOUT;
}
break;
case SF_CON_NETWORK_TIMEOUT:
sf->network_timeout = value ? *((int64 *) value) : SF_LOGIN_TIMEOUT;
sf->network_timeout = value ? *((int64 *) value) : SF_NETWORK_TIMEOUT;
break;
case SF_CON_AUTOCOMMIT:
sf->autocommit = value ? *((sf_bool *) value) : SF_BOOLEAN_TRUE;
Expand Down Expand Up @@ -1089,7 +1093,11 @@ SF_STATUS STDCALL snowflake_set_attribute(
sf->jwt_cnxn_wait_time = value ? *((int64 *)value) : SF_JWT_CNXN_WAIT_TIME;
break;
case SF_CON_MAX_CON_RETRY:
sf->retry_on_connect_count = value ? *((int8 *)value) : 0;
sf->retry_on_connect_count = value ? *((int8 *)value) : SF_LOGIN_MAX_RETRY;
if (sf->retry_on_connect_count < SF_LOGIN_MAX_RETRY)
{
sf->retry_on_connect_count = SF_LOGIN_MAX_RETRY;
}
break;
case SF_CON_PROXY:
alloc_buffer_and_copy(&sf->proxy, value);
Expand Down
8 changes: 8 additions & 0 deletions lib/client_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#define HEADER_C_API_USER_AGENT_MAX_LEN 256
#define HEADER_DIRECT_QUERY_TOKEN_FORMAT "Authorization: %s"
#define HEADER_SERVICE_NAME_FORMAT "X-Snowflake-Service: %s"
#define HEADER_CLIENT_APP_ID_FORMAT "CLIENT_APP_ID: %s"
#define HEADER_CLIENT_APP_VERSION_FORMAT "CLIENT_APP_VERSION: %s"

#define DEFAULT_SNOWFLAKE_BASE_URL "snowflakecomputing.com"
#define DEFAULT_SNOWFLAKE_REQUEST_TIMEOUT 60
Expand All @@ -26,10 +28,16 @@
#define QUERY_URL "/queries/v1/query-request"
#define RENEW_SESSION_URL "/session/token-request"
#define DELETE_SESSION_URL "/session"
// not used for now but add for URL checking on connection requests
#define AUTHENTICATOR_URL "/session/authenticator-request"

#define URL_PARAM_REQEST_GUID "request_guid="
#define URL_PARAM_RETRY_COUNT "retryCount="
#define URL_PARAM_RETRY_REASON "retryReason="

#define CLIENT_APP_ID_KEY "CLIENT_APP_ID"
#define CLIENT_APP_VERSION_KEY "CLIENT_APP_VERSION"

// having extra size in url buffer for retry context or something else could
// be added in the future.
#define URL_EXTRA_SIZE 256
Expand Down
131 changes: 108 additions & 23 deletions lib/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ cJSON *STDCALL create_auth_json_body(SF_CONNECT *sf,

//Create Request Data JSON blob
data = snowflake_cJSON_CreateObject();
snowflake_cJSON_AddStringToObject(data, "CLIENT_APP_ID", int_app_name);
snowflake_cJSON_AddStringToObject(data, CLIENT_APP_ID_KEY, int_app_name);
#ifdef MOCK_ENABLED
snowflake_cJSON_AddStringToObject(data, "CLIENT_APP_VERSION", "0.0.0");
snowflake_cJSON_AddStringToObject(data, CLIENT_APP_VERSION_KEY, "0.0.0");
#else
snowflake_cJSON_AddStringToObject(data, "CLIENT_APP_VERSION", int_app_version);
snowflake_cJSON_AddStringToObject(data, CLIENT_APP_VERSION_KEY, int_app_version);
#endif
snowflake_cJSON_AddStringToObject(data, "ACCOUNT_NAME", sf->account);
snowflake_cJSON_AddStringToObject(data, "LOGIN_NAME", sf->user);
Expand Down Expand Up @@ -330,14 +330,28 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf,
// Set to 0
memset(query_code, 0, QUERYCODE_LEN);

int64 timeout;
sf_bool is_login_request = is_login_url(url);
if (SF_BOOLEAN_TRUE == is_login_request)
{
timeout = sf->login_timeout;
if (!add_appinfo_header(sf, header, error)) {
return ret;
}
}
else
{
timeout = sf->network_timeout;
}
do {
if (!http_perform(curl, POST_REQUEST_TYPE, url, header, body, json, NULL,
sf->network_timeout, SF_BOOLEAN_FALSE, error,
timeout, SF_BOOLEAN_FALSE, error,
sf->insecure_mode,
sf->retry_on_curle_couldnt_connect_count,
renew_timeout, retry_max_count, elapsed_time,
retried_count, is_renew, renew_injection,
sf->proxy, sf->no_proxy, sf->include_retry_reason) ||
sf->proxy, sf->no_proxy, sf->include_retry_reason,
is_login_request) ||
!*json) {
// Error is set in the perform function
break;
Expand Down Expand Up @@ -462,7 +476,7 @@ sf_bool STDCALL curl_get_call(SF_CONNECT *sf,
sf->insecure_mode,
sf->retry_on_curle_couldnt_connect_count,
0, 0, NULL, NULL, NULL, SF_BOOLEAN_FALSE,
sf->proxy, sf->no_proxy, SF_BOOLEAN_FALSE) ||
sf->proxy, sf->no_proxy, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE) ||
!*json) {
// Error is set in the perform function
break;
Expand Down Expand Up @@ -534,15 +548,17 @@ STDCALL decorrelate_jitter_init(uint32 base, uint32 cap) {
}

uint32
decorrelate_jitter_next_sleep(DECORRELATE_JITTER_BACKOFF *djb, uint32 sleep) {
get_next_sleep_with_jitter(DECORRELATE_JITTER_BACKOFF *djb, uint32 sleep) {
sleep = uimin(sleep, djb->cap);
// Prevents division by 0 when sleep = 1
// and if sleep == 2 the value of sleep time returned never changes.
if(sleep <= 2)
{
sleep = 4;
}
return ((uint32)(sleep/2) + (uint32) (rand() % (sleep/2)));
// (sleep/2) + (random from 0 to sleep) = random from sleep/2 to sleep + sleep/2
// = sleep +-50%
return ((uint32)(sleep/2) + (uint32) (rand() % (sleep + 1)));
}

char * STDCALL encode_url(CURL *curl,
Expand Down Expand Up @@ -874,7 +890,7 @@ json_resp_cb(char *data, size_t size, size_t nmemb, RAW_JSON_BUFFER *raw_json) {

sf_bool STDCALL is_retryable_http_code(long int code) {
return ((code >= 500 && code < 600) || code == 400 || code == 403 ||
code == 408) ? SF_BOOLEAN_TRUE : SF_BOOLEAN_FALSE;
code == 408 || code == 429) ? SF_BOOLEAN_TRUE : SF_BOOLEAN_FALSE;
}

sf_bool STDCALL request(SF_CONNECT *sf,
Expand Down Expand Up @@ -1006,7 +1022,7 @@ sf_bool STDCALL renew_session(CURL *curl, SF_CONNECT *sf, SF_ERROR_STRUCT *error
// Successful call, non-null json, successful success code, data object and session token must all be present
// otherwise set an error
if (!curl_post_call(sf, curl, encoded_url, header, s_body, &json, error,
0, 0, NULL, NULL, NULL, SF_BOOLEAN_FALSE) ||
0, sf->retry_on_connect_count, NULL, NULL, NULL, SF_BOOLEAN_FALSE) ||
!json) {
// Do nothing, let error propogate up from post call
log_error("Curl call failed during renew session");
Expand Down Expand Up @@ -1063,21 +1079,21 @@ void STDCALL retry_ctx_free(RETRY_CONTEXT *retry_ctx) {
SF_FREE(retry_ctx);
}

RETRY_CONTEXT *STDCALL retry_ctx_init(uint64 timeout) {
RETRY_CONTEXT *retry_ctx = (RETRY_CONTEXT *) SF_CALLOC(1,
sizeof(RETRY_CONTEXT));
retry_ctx->retry_timeout = timeout;
retry_ctx->retry_count = 0;
retry_ctx->retry_reason = 0;
retry_ctx->sleep_time = 1;
retry_ctx->djb = decorrelate_jitter_init(1, 16);
return retry_ctx;
}

uint32 STDCALL retry_ctx_next_sleep(RETRY_CONTEXT *retry_ctx) {
retry_ctx->sleep_time = decorrelate_jitter_next_sleep(retry_ctx->djb, retry_ctx->sleep_time * 2);
uint32 jittered_sleep = get_next_sleep_with_jitter(retry_ctx->djb, retry_ctx->sleep_time);
retry_ctx->sleep_time = retry_ctx->sleep_time * 2;
++retry_ctx->retry_count;
return retry_ctx->sleep_time;

// limit the sleep time within retry timeout
uint32 time_elapsed = time(NULL) - retry_ctx->start_time;
if (time_elapsed >= retry_ctx->retry_timeout)
{
// retry timeout is checked before calling retry_ctx_next_sleep
// so we just get bad timing here, sleep 1 seconds so the timeout
// can be caught right after
return 1;
}
return uimin(jittered_sleep, (uint32)(retry_ctx->retry_timeout - time_elapsed));
}

sf_bool STDCALL retry_ctx_update_url(RETRY_CONTEXT *retry_ctx,
Expand Down Expand Up @@ -1178,6 +1194,8 @@ SF_HEADER* STDCALL sf_header_create() {
sf_header->header_direct_query_token = NULL;
sf_header->header_service_name = NULL;
sf_header->header_token = NULL;
sf_header->header_app_id = NULL;
sf_header->header_app_version = NULL;
sf_header->use_application_json_accept_type = SF_BOOLEAN_FALSE;
sf_header->renew_session = SF_BOOLEAN_FALSE;
return sf_header;
Expand All @@ -1191,6 +1209,73 @@ void STDCALL sf_header_destroy(SF_HEADER *sf_header) {
SF_FREE(sf_header->header_token);
SF_FREE(sf_header->header_service_name);
SF_FREE(sf_header->header_direct_query_token);
SF_FREE(sf_header->header_app_id);
SF_FREE(sf_header->header_app_version);
curl_slist_free_all(sf_header->header);
SF_FREE(sf_header);
}

sf_bool is_login_url(const char * url)
{
if (!url)
{
return SF_BOOLEAN_FALSE;
}

if (strstr(url, SESSION_URL) ||
strstr(url, RENEW_SESSION_URL) ||
strstr(url, AUTHENTICATOR_URL))
{
return SF_BOOLEAN_TRUE;
}

return SF_BOOLEAN_FALSE;
}

sf_bool add_appinfo_header(SF_CONNECT *sf, SF_HEADER *header, SF_ERROR_STRUCT *error) {
sf_bool ret = SF_BOOLEAN_FALSE;
size_t header_appid_size;
size_t header_appver_size;

// Generate header tokens
header_appid_size = strlen(HEADER_CLIENT_APP_ID_FORMAT) - 2 +
strlen(sf->application_name) + 1;
header_appver_size = strlen(HEADER_CLIENT_APP_VERSION_FORMAT) - 2 +
strlen(sf->application_version) + 1;

// check NULL first to ensure the header won't be added twice
if (!header->header_app_id)
{
header->header_app_id = (char *)SF_CALLOC(1, header_appid_size);
if (!header->header_app_id) {
SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_OUT_OF_MEMORY,
"Ran out of memory trying to create header CLIENT_APP_ID",
SF_SQLSTATE_UNABLE_TO_CONNECT);
goto error;
}
sb_sprintf(header->header_app_id, header_appid_size,
HEADER_CLIENT_APP_ID_FORMAT, sf->application_name);
header->header = curl_slist_append(header->header, header->header_app_id);
}

if (!header->header_app_version)
{
header->header_app_version = (char *)SF_CALLOC(1, header_appver_size);
if (!header->header_app_version) {
SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_OUT_OF_MEMORY,
"Ran out of memory trying to create header CLIENT_APP_VERSION",
SF_SQLSTATE_UNABLE_TO_CONNECT);
goto error;
}
sb_sprintf(header->header_app_version, header_appver_size,
HEADER_CLIENT_APP_VERSION_FORMAT, sf->application_version);
header->header = curl_slist_append(header->header, header->header_app_version);
}

log_trace("Added application infor header");

ret = SF_BOOLEAN_TRUE;

error:
return ret;
}
40 changes: 30 additions & 10 deletions lib/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ typedef struct URL_KEY_VALUE {
size_t value_size;
} URL_KEY_VALUE;

// internal definition for backoff time
#define SF_BACKOFF_BASE 1
#define SF_BACKOFF_CAP 16
// to meet 300 seconds timetout with 7 retries
// 4 + 8 + 16 + 32 + 64 + 128 + 45
// the CAP of 128 would keep the backoff time with a reasonable value in case
// customer increate the login timeout and retry number.
#define SF_LOGIN_BACKOFF_BASE 4
#define SF_LOGIN_BACKOFF_CAP 128
/**
* Used to keep track of min and max backoff time for a connection retry
*/
Expand All @@ -131,13 +140,17 @@ typedef struct RETRY_CONTEXT {
uint32 sleep_time;
// Decorrelate Jitter is used to determine sleep time
DECORRELATE_JITTER_BACKOFF *djb;
// start time to track on retry timeout
time_t start_time;
} RETRY_CONTEXT;

typedef struct SF_HEADER {
struct curl_slist *header;
char *header_direct_query_token;
char *header_service_name;
char *header_token;
char *header_app_id;
char *header_app_version;

sf_bool use_application_json_accept_type;
sf_bool renew_session;
Expand Down Expand Up @@ -263,7 +276,7 @@ sf_bool STDCALL curl_get_call(SF_CONNECT *sf, CURL *curl, char *url, SF_HEADER *
* @param sleep Duration of last sleep in seconds.
* @return Number of seconds to sleep.
*/
uint32 decorrelate_jitter_next_sleep(DECORRELATE_JITTER_BACKOFF *djb, uint32 sleep);
uint32 get_next_sleep_with_jitter(DECORRELATE_JITTER_BACKOFF *djb, uint32 sleep);

/**
* Creates a URL that is safe to use with cURL. Caller must free the memory associated with the encoded URL.
Expand Down Expand Up @@ -441,7 +454,8 @@ sf_bool STDCALL http_perform(CURL *curl, SF_REQUEST_TYPE request_type, char *url
int64 *elapsed_time, int8 *retried_count,
sf_bool *is_renew, sf_bool renew_injection,
const char *proxy, const char *no_proxy,
sf_bool include_retry_reason);
sf_bool include_retry_reason,
sf_bool is_login_request);

/**
* Returns true if HTTP code is retryable, false otherwise.
Expand Down Expand Up @@ -512,14 +526,6 @@ void STDCALL reset_curl(CURL *curl);
*/
void STDCALL retry_ctx_free(RETRY_CONTEXT *retry_ctx);

/**
* Creates a retry context object and returns it
*
* @param timeout the initial value to set the context's retry_timeout to
* @return Returns an initialized RETRY_CONTEXT object
*/
RETRY_CONTEXT *STDCALL retry_ctx_init(uint64 timeout);

/**
* Determines next sleep duration for request retry. Sets new sleep duration value in Retry Context.
*
Expand Down Expand Up @@ -569,6 +575,20 @@ void STDCALL sf_header_destroy(SF_HEADER *sf_header);
*/
CURLcode set_curl_proxy(CURL *curl, const char* proxy, const char* no_proxy);

/**
* Determines if the url is login request against to
* login-request
* authenticator-request
* token-request
*
* @param url Url string to check
* @return True (1) if it's login request, False (0) if not.
*/
sf_bool is_login_url(const char * url);

// add CLIENT_APP_ID/CLIENT_APP_VERSION in header for login rquests
sf_bool add_appinfo_header(SF_CONNECT *sf, SF_HEADER *header, SF_ERROR_STRUCT *error);

#ifdef __cplusplus
}
#endif
Expand Down
Loading

0 comments on commit 5f5cf96

Please sign in to comment.