From 763153fbc5432808a9812ed4304b224600625d56 Mon Sep 17 00:00:00 2001 From: norrislee Date: Wed, 27 Nov 2024 20:08:53 -0800 Subject: [PATCH 01/16] Add async query support --- include/snowflake/client.h | 37 +++++++ lib/client.c | 208 ++++++++++++++++++++++++++++++++++++- lib/client_int.h | 7 +- lib/connection.c | 1 - tests/CMakeLists.txt | 1 + 5 files changed, 247 insertions(+), 7 deletions(-) diff --git a/include/snowflake/client.h b/include/snowflake/client.h index 1dd5f53b30..3139718fc3 100644 --- a/include/snowflake/client.h +++ b/include/snowflake/client.h @@ -294,6 +294,25 @@ typedef enum SF_STMT_ATTRIBUTE { SF_STMT_USER_REALLOC_FUNC } SF_STMT_ATTRIBUTE; +/** + * The query status + */ +typedef enum SF_QUERY_STATUS { + SF_QUERY_STATUS_RUNNING, + SF_QUERY_STATUS_ABORTING, + SF_QUERY_STATUS_SUCCESS, + SF_QUERY_STATUS_FAILED_WITH_ERROR, + SF_QUERY_STATUS_ABORTED, + SF_QUERY_STATUS_QUEUED, + SF_QUERY_STATUS_FAILED_WITH_INCIDENT, + SF_QUERY_STATUS_DISCONNECTED, + SF_QUERY_STATUS_RESUMING_WAREHOUSE, + SF_QUERY_STATUS_QUEUED_REPAIRING_WAREHOUSE, + SF_QUERY_STATUS_RESTARTED, + SF_QUERY_STATUS_BLOCKED, + SF_QUERY_STATUS_NO_DATA +} SF_QUERY_STATUS; + /** * Snowflake Error */ @@ -613,6 +632,16 @@ SF_STATUS STDCALL snowflake_get_attribute( */ SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf); +/** + * Creates sf SNOWFLAKE_STMT context for async queries. + * + * @param sf The SF_CONNECT context. + * @param query_id the query id of the async query. + * + * @return sfstmt SNOWFLAKE_STMT context for async queries. + */ +SF_STMT* STDCALL snowflake_async_stmt(SF_CONNECT *sf, const char *query_id); + /** * Frees the memory used by a SF_QUERY_RESULT_CAPTURE struct. * Note that this only frees the struct itself, and *not* the underlying @@ -775,6 +804,14 @@ snowflake_stmt_get_attr(SF_STMT *sfstmt, SF_STMT_ATTRIBUTE type, void **value); */ SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt); +/** + * Executes a statement asynchronously. + * @param sfstmt SNOWFLAKE_STMT context. + * + * @return 0 if success, otherwise an errno is returned. + */ +SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt); + /** * Executes a statement with capture. * @param sfstmt SNOWFLAKE_STMT context. diff --git a/lib/client.c b/lib/client.c index 587cdb1692..593e67dcad 100644 --- a/lib/client.c +++ b/lib/client.c @@ -54,12 +54,173 @@ static SF_STATUS STDCALL _reset_connection_parameters(SF_CONNECT *sf, cJSON *parameters, cJSON *session_info, sf_bool do_validate); +static const char* query_status_names[] = { + "RUNNING", + "ABORTING", + "SUCCESS", + "FAILED_WITH_ERROR", + "ABORTED", + "QUEUED", + "FAILED_WITH_INCIDENT", + "DISCONNECTED", + "RESUMING_WAREHOUSE", + "QUEUED_REPAIRING_WAREHOUSE", + "RESTARTED", + "BLOCKED", + "NO_DATA" +}; + /** * Validate partner application name. * @param application partner application name */ sf_bool validate_application(const char *application); +/** + * Helper function to get SF_QUERY_STATUS given the string representation + * @param query_status the string representation of the query status + */ +SF_QUERY_STATUS get_status_from_string(const char *query_status) { + if (query_status == NULL) { + return SF_QUERY_STATUS_NO_DATA; + } + int idx = 0, last = 0; + for (idx = 0, last = (int)SF_QUERY_STATUS_NO_DATA; idx <= last; ++idx) { + size_t len = strlen(query_status_names[idx]); + if (sf_strncasecmp(query_status_names[idx], query_status, len) == 0) { + return (SF_QUERY_STATUS)idx; + } + } + return SF_QUERY_STATUS_NO_DATA; +} + +/** + * Get the metadata of the query + * @param sf the SF_CONNECT context + * @param query_id the query id + */ +char *get_query_metadata(SF_CONNECT *sf, const char *query_id) { + cJSON *resp = NULL; + cJSON *data = NULL; + cJSON *queries = NULL; + char *s_resp = NULL; + const char *error_msg; + size_t url_size = strlen(QUERY_MONITOR_URL) -2 + strlen(query_id) + 1; + char *status_query = (char*)SF_CALLOC(1, url_size); + sf_sprintf(status_query, url_size, QUERY_MONITOR_URL, query_id); + + if (request(sf, &resp, status_query, NULL, 0, NULL, NULL, + GET_REQUEST_TYPE, &sf->error, SF_BOOLEAN_TRUE, + 0, sf->retry_count, get_retry_timeout(sf), + NULL, NULL, NULL, SF_BOOLEAN_FALSE)) { + + s_resp = snowflake_cJSON_Print(resp); + log_info("Here is JSON response:\n%s", s_resp); + + data = snowflake_cJSON_GetObjectItem(resp, "data"); + + queries = snowflake_cJSON_GetObjectItem(data, "queries"); + cJSON* query = snowflake_cJSON_GetArrayItem(queries, 0); + + char *metadata = snowflake_cJSON_Print(query); + snowflake_cJSON_Delete(resp); + SF_FREE(s_resp); + SF_FREE(status_query); + return metadata; + } + SF_FREE(status_query); + log_trace("Error getting query metadata."); + return NULL; +} + +/** + * Get the status of the query + * @param sf the SF_CONNECT context + * @param query_id the query id + */ +SF_QUERY_STATUS get_query_status(SF_CONNECT *sf, const char *query_id) { + SF_QUERY_STATUS ret = SF_QUERY_STATUS_NO_DATA; + char *metadata = get_query_metadata(sf, query_id); + if (metadata) { + cJSON* metadataJson = snowflake_cJSON_Parse(metadata); + + cJSON* status = snowflake_cJSON_GetObjectItem(metadataJson, "status"); + if (snowflake_cJSON_IsString(status)) + { + char* queryStatus = snowflake_cJSON_GetStringValue(status); + ret = get_status_from_string(queryStatus); + } + snowflake_cJSON_Delete(metadataJson); + } + + return ret; +} + +/** + * Helper function to determine if the query is still running + * @param query_status the query status + */ +sf_bool is_query_still_running(SF_QUERY_STATUS query_status) { + return (query_status == SF_QUERY_STATUS_RUNNING) || + (query_status == SF_QUERY_STATUS_QUEUED) || + (query_status == SF_QUERY_STATUS_RESUMING_WAREHOUSE) || + (query_status == SF_QUERY_STATUS_QUEUED_REPAIRING_WAREHOUSE) || + (query_status == SF_QUERY_STATUS_NO_DATA); +} + +/** + * Get the results of the async query + * @param sfstmt The SF_STMT context + */ +void get_real_results(SF_STMT * sfstmt) { + SF_QUERY_STATUS query_status = get_query_status(sfstmt->connection, sfstmt->sfqid); + int retry = 0; + int no_data_retry = 0; + int no_data_max_retries = 30; + int retry_pattern[] = {1, 1, 2, 3, 4, 8, 10}; + int max_retries = 7; + while (query_status != SF_QUERY_STATUS_SUCCESS) { + if (!is_query_still_running(query_status) && query_status != SF_QUERY_STATUS_SUCCESS) { + log_error("Query status is done running and did not succeed. Status is %s", query_status_names[query_status]); + return; + } + if (query_status == SF_QUERY_STATUS_NO_DATA) { + no_data_retry++; + if (no_data_retry >= no_data_max_retries) { + log_error( + "Cannot retrieve data on the status of this query. No information returned from server for queryID=%s", sfstmt->sfqid); + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_GENERAL, + "Cannot retrieve data on the status of this query.", + NULL, + sfstmt->sfqid); + return; + } + } + } + int sleep_time = retry_pattern[retry] * 500; +#ifdef _WIN32 + Sleep(sleep_time); +#else + usleep(sleep_time * 1000); +#endif + if (retry < max_retries) { + retry++; + } else { + log_error( + "Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid); + } + query_status = get_query_status(sfstmt->connection, sfstmt->sfqid); + + char query[1024]; + char* query_template = "select * from table(result_scan('%s'))"; + sf_sprintf(query, strlen(query_template) - 2 + strlen(sfstmt->sfqid) + 1, query_template, sfstmt->sfqid); + SF_STATUS ret = snowflake_query(sfstmt, query, strlen(query)); + if (ret != SF_STATUS_SUCCESS) { + snowflake_propagate_error(sfstmt->connection, sfstmt); + } +} + #define _SF_STMT_TYPE_DML 0x3000 #define _SF_STMT_TYPE_INSERT (_SF_STMT_TYPE_DML + 0x100) #define _SF_STMT_TYPE_UPDATE (_SF_STMT_TYPE_DML + 0x200) @@ -1576,6 +1737,33 @@ SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf) { return sfstmt; } +SF_STMT *STDCALL snowflake_async_stmt(SF_CONNECT *sf, const char *query_id) { + if (!sf) { + return NULL; + } + + SF_STMT *sfstmt = (SF_STMT *)SF_CALLOC(1, sizeof(SF_STMT)); + if (sfstmt) { + _snowflake_stmt_reset(sfstmt); + sfstmt->connection = sf; + sf_strcpy(sfstmt->sfqid, SF_UUID4_LEN, query_id); + } + + get_real_results(sfstmt); + + char *metadata_str = get_query_metadata(sfstmt->connection, query_id); + if (metadata_str) { + cJSON* metadata = snowflake_cJSON_Parse(metadata_str); + cJSON* stats = snowflake_cJSON_GetObjectItem(metadata, "stats"); + if (snowflake_cJSON_IsObject(stats)) { + _snowflake_stmt_row_metadata_reset(sfstmt); + sfstmt->stats = set_stats(stats); + } + } + + return sfstmt; +} + /** * Initializes an SF_QUERY_RESPONSE_CAPTURE struct. * Note that these need to be released by calling snowflake_query_result_capture_term(). @@ -1954,21 +2142,26 @@ snowflake_prepare(SF_STMT *sfstmt, const char *command, size_t command_size) { SF_STATUS STDCALL snowflake_describe_with_capture(SF_STMT *sfstmt, SF_QUERY_RESULT_CAPTURE *result_capture) { - return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_TRUE); + return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_TRUE, SF_BOOLEAN_FALSE); } SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt) { - return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE); + return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE); +} + +SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt) { + return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_TRUE); } SF_STATUS STDCALL snowflake_execute_with_capture(SF_STMT *sfstmt, SF_QUERY_RESULT_CAPTURE *result_capture) { - return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_FALSE); + return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE); } SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, sf_bool is_put_get_command, SF_QUERY_RESULT_CAPTURE* result_capture, - sf_bool is_describe_only) { + sf_bool is_describe_only, + sf_bool is_async_exec) { if (!sfstmt) { return SF_STATUS_ERROR_STATEMENT_NOT_EXIST; } @@ -2073,6 +2266,13 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, body = create_query_json_body(sfstmt->sql_text, sfstmt->sequence_counter, is_string_empty(sfstmt->connection->directURL) ? NULL : sfstmt->request_id, is_describe_only); + + if (is_async_exec) { + snowflake_cJSON_AddBoolToObject(body, "asyncExec", SF_BOOLEAN_TRUE); + } else { + snowflake_cJSON_AddBoolToObject(body, "asyncExec", SF_BOOLEAN_FALSE); + } + if (bindings != NULL) { /* binding parameters if exists */ snowflake_cJSON_AddItemToObject(body, "bindings", bindings); diff --git a/lib/client_int.h b/lib/client_int.h index ed6b46c5a2..6fa0d2a7a1 100644 --- a/lib/client_int.h +++ b/lib/client_int.h @@ -29,6 +29,7 @@ #define QUERY_URL "/queries/v1/query-request" #define RENEW_SESSION_URL "/session/token-request" #define DELETE_SESSION_URL "/session" +#define QUERY_MONITOR_URL "/monitoring/queries/%s" // not used for now but add for URL checking on connection requests #define AUTHENTICATOR_URL "/session/authenticator-request" @@ -141,15 +142,17 @@ SF_PUT_GET_RESPONSE *STDCALL sf_put_get_response_allocate(); * @param sfstmt SNOWFLAKE_STMT context. * @param sf_use_application_json_accept type true if this is a put/get command * @param raw_response_buffer optional pointer to an SF_QUERY_RESULT_CAPTURE, - * @param is_describe_only should the statement be executed in describe only mode * if the query response is to be captured. + * @param is_describe_only should the statement be executed in describe only mode + * @param is_async_exec should it execute asynchronously * * @return 0 if success, otherwise an errno is returned. */ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, sf_bool use_application_json_accept_type, struct SF_QUERY_RESULT_CAPTURE* result_capture, - sf_bool is_describe_only); + sf_bool is_describe_only, + sf_bool is_async_exec); /** * @return true if this is a put/get command, otherwise false diff --git a/lib/connection.c b/lib/connection.c index 94661ac6d9..d6588eb655 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -212,7 +212,6 @@ cJSON *STDCALL create_query_json_body(const char *sql_text, int64 sequence_id, c #endif body = snowflake_cJSON_CreateObject(); snowflake_cJSON_AddStringToObject(body, "sqlText", sql_text); - snowflake_cJSON_AddBoolToObject(body, "asyncExec", SF_BOOLEAN_FALSE); snowflake_cJSON_AddNumberToObject(body, "sequenceId", (double) sequence_id); snowflake_cJSON_AddNumberToObject(body, "querySubmissionTime", submission_time); snowflake_cJSON_AddBoolToObject(body, "describeOnly", is_describe_only); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b4a8fe9fc4..151bda5d64 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -42,6 +42,7 @@ SET(TESTS_C test_get_describe_only_query_result test_stmt_functions test_unit_oauth + test_async test_unit_mfa_auth test_ocsp_fail_open # FEATURE_INCREASED_MAX_LOB_SIZE_IN_MEMORY is internal switch From 974613b01017c542bb36d5df2878f1c6406a9f2d Mon Sep 17 00:00:00 2001 From: norrislee Date: Thu, 28 Nov 2024 09:18:45 -0800 Subject: [PATCH 02/16] Add async test file --- tests/test_async.c | 152 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 tests/test_async.c diff --git a/tests/test_async.c b/tests/test_async.c new file mode 100644 index 0000000000..ef2cf10e14 --- /dev/null +++ b/tests/test_async.c @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2024 Snowflake Computing, Inc. All rights reserved. + */ +#include +#include "memory.h" +#include "utils/test_setup.h" + + /** + * Test normal query flow with async + */ +void test_select(void **unused) { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select 1;", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* get results */ + int64 out = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 1); + + int counter = 0; + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_int64(sfstmt, 1, &out); + assert_int_equal(out, 1); + ++counter; + } + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); +} + +/** + * Test async with new connection + */ +void test_new_connection(void** unused) { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select 1;", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + char* sfqid = (char*)SF_CALLOC(1, SF_UUID4_LEN); + sf_strcpy(sfqid, SF_UUID4_LEN, sfstmt->sfqid); + + snowflake_stmt_term(sfstmt); + snowflake_term(sf); + + /* new connection */ + sf = setup_snowflake_connection(); + status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + SF_QUERY_STATUS queryStatus = get_query_status(sf, sfqid); + assert_int_equal(queryStatus, SF_QUERY_STATUS_SUCCESS); + + SF_STMT* async_sfstmt = snowflake_async_stmt(sf, sfqid); + + /* get results */ + int64 out = 0; + assert_int_equal(snowflake_num_rows(async_sfstmt), 1); + + int counter = 0; + while ((status = snowflake_fetch(async_sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_int64(async_sfstmt, 1, &out); + assert_int_equal(out, 1); + ++counter; + } + if (status != SF_STATUS_EOF) { + dump_error(&(async_sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); +} + +/** + * Test async query with fake table + */ +void test_fake_table(void** unused) { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select * from fake_table;", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + assert_int_equal(status, SF_STATUS_ERROR_GENERAL); +} + +/** + * Test async query with fake table + */ +void test_invalid_query_id(void** unused) { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + char* fake_sfqid = "fake-query-id"; + SF_STMT* async_sfstmt = snowflake_async_stmt(sf, fake_sfqid); + + assert_non_null(async_sfstmt); + assert_non_null(async_sfstmt->connection); + assert_string_equal(async_sfstmt->sfqid, fake_sfqid); + assert_null(async_sfstmt->result_set); +} + +int main(void) { + initialize_test(SF_BOOLEAN_FALSE); + const struct CMUnitTest tests[] = { + cmocka_unit_test(test_select), + cmocka_unit_test(test_new_connection), + cmocka_unit_test(test_fake_table), + cmocka_unit_test(test_invalid_query_id), + }; + int ret = cmocka_run_group_tests(tests, NULL, NULL); + snowflake_global_term(); + return ret; +} From a2f270193647563cbe451e1a6c49b32962447ac7 Mon Sep 17 00:00:00 2001 From: norrislee Date: Thu, 28 Nov 2024 10:23:06 -0800 Subject: [PATCH 03/16] Fix linux build by including unistd.h --- lib/client.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/client.c b/lib/client.c index 593e67dcad..c81ebc1afa 100644 --- a/lib/client.c +++ b/lib/client.c @@ -24,6 +24,8 @@ #include #define strncasecmp _strnicmp #define strcasecmp _stricmp +#else +#include #endif #define curl_easier_escape(curl, string) curl_easy_escape(curl, string, 0) From 3829b60a5d35075d4f249092d358145429c3dd50 Mon Sep 17 00:00:00 2001 From: norrislee Date: Thu, 28 Nov 2024 10:59:43 -0800 Subject: [PATCH 04/16] Remove unnecessary test --- tests/test_async.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_async.c b/tests/test_async.c index ef2cf10e14..d43fc66e25 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -78,8 +78,6 @@ void test_new_connection(void** unused) { dump_error(&(sf->error)); } assert_int_equal(status, SF_STATUS_SUCCESS); - SF_QUERY_STATUS queryStatus = get_query_status(sf, sfqid); - assert_int_equal(queryStatus, SF_QUERY_STATUS_SUCCESS); SF_STMT* async_sfstmt = snowflake_async_stmt(sf, sfqid); From d94715923ea3848eb328f53d0c1ee841116964a9 Mon Sep 17 00:00:00 2001 From: norrislee Date: Mon, 2 Dec 2024 17:13:14 -0800 Subject: [PATCH 05/16] Add query status to C API, refactor some query parameters, move getting results to when user wants to actually get results --- include/snowflake/client.h | 13 +++- lib/client.c | 134 ++++++++++++++++++++++++------------- tests/test_async.c | 4 +- 3 files changed, 102 insertions(+), 49 deletions(-) diff --git a/include/snowflake/client.h b/include/snowflake/client.h index 3139718fc3..9880538bf1 100644 --- a/include/snowflake/client.h +++ b/include/snowflake/client.h @@ -495,6 +495,8 @@ typedef struct SF_STMT { SF_STATS *stats; void *stmt_attrs; sf_bool is_dml; + sf_bool is_async; + sf_bool is_async_initialized; /** * User realloc function used in snowflake_fetch @@ -640,7 +642,16 @@ SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf); * * @return sfstmt SNOWFLAKE_STMT context for async queries. */ -SF_STMT* STDCALL snowflake_async_stmt(SF_CONNECT *sf, const char *query_id); +SF_STMT* STDCALL snowflake_create_async_query_result(SF_CONNECT *sf, const char *query_id); + +/** + * Get the status of a query + * + * @param sfstmt The SF_STMT context. + * + * @return The query status. + */ +SF_QUERY_STATUS STDCALL snowflake_get_query_status(SF_STMT sfstmt); /** * Frees the memory used by a SF_QUERY_RESULT_CAPTURE struct. diff --git a/lib/client.c b/lib/client.c index c81ebc1afa..7ee02bf180 100644 --- a/lib/client.c +++ b/lib/client.c @@ -98,26 +98,28 @@ SF_QUERY_STATUS get_status_from_string(const char *query_status) { /** * Get the metadata of the query - * @param sf the SF_CONNECT context - * @param query_id the query id + * + * @param sfstmt The SF_STMT context. + * + * The query metadata */ -char *get_query_metadata(SF_CONNECT *sf, const char *query_id) { +char *get_query_metadata(SF_STMT* sfstmt) { cJSON *resp = NULL; cJSON *data = NULL; cJSON *queries = NULL; char *s_resp = NULL; const char *error_msg; - size_t url_size = strlen(QUERY_MONITOR_URL) -2 + strlen(query_id) + 1; + size_t url_size = strlen(QUERY_MONITOR_URL) -2 + strlen(sfstmt->sfqid) + 1; char *status_query = (char*)SF_CALLOC(1, url_size); - sf_sprintf(status_query, url_size, QUERY_MONITOR_URL, query_id); + sf_sprintf(status_query, url_size, QUERY_MONITOR_URL, sfstmt->sfqid); - if (request(sf, &resp, status_query, NULL, 0, NULL, NULL, - GET_REQUEST_TYPE, &sf->error, SF_BOOLEAN_TRUE, - 0, sf->retry_count, get_retry_timeout(sf), + if (request(sfstmt->connection, &resp, status_query, NULL, 0, NULL, NULL, + GET_REQUEST_TYPE, &sfstmt->error, SF_BOOLEAN_TRUE, + 0, sfstmt->connection->retry_count, get_retry_timeout(sfstmt->connection), NULL, NULL, NULL, SF_BOOLEAN_FALSE)) { s_resp = snowflake_cJSON_Print(resp); - log_info("Here is JSON response:\n%s", s_resp); + log_trace("Here is JSON response:\n%s", s_resp); data = snowflake_cJSON_GetObjectItem(resp, "data"); @@ -131,29 +133,38 @@ char *get_query_metadata(SF_CONNECT *sf, const char *query_id) { return metadata; } SF_FREE(status_query); - log_trace("Error getting query metadata."); + log_error("Error getting query metadata. Query id: %s", sfstmt->sfqid); return NULL; } -/** - * Get the status of the query - * @param sf the SF_CONNECT context - * @param query_id the query id - */ -SF_QUERY_STATUS get_query_status(SF_CONNECT *sf, const char *query_id) { + +SF_QUERY_STATUS snowflake_get_query_status(SF_STMT *sfstmt) { SF_QUERY_STATUS ret = SF_QUERY_STATUS_NO_DATA; - char *metadata = get_query_metadata(sf, query_id); + char *metadata = get_query_metadata(sfstmt); if (metadata) { cJSON* metadataJson = snowflake_cJSON_Parse(metadata); cJSON* status = snowflake_cJSON_GetObjectItem(metadataJson, "status"); - if (snowflake_cJSON_IsString(status)) - { + if (snowflake_cJSON_IsString(status)) { char* queryStatus = snowflake_cJSON_GetStringValue(status); ret = get_status_from_string(queryStatus); } + else { + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_GENERAL, + "Error retrieving the status from the metadata.", + NULL, + sfstmt->sfqid); + } snowflake_cJSON_Delete(metadataJson); } + else { + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_GENERAL, + "Error retrieving query metadata.", + NULL, + sfstmt->sfqid); + } return ret; } @@ -174,8 +185,9 @@ sf_bool is_query_still_running(SF_QUERY_STATUS query_status) { * Get the results of the async query * @param sfstmt The SF_STMT context */ -void get_real_results(SF_STMT * sfstmt) { - SF_QUERY_STATUS query_status = get_query_status(sfstmt->connection, sfstmt->sfqid); +void get_real_results(SF_STMT *sfstmt) { + //Get status until query is complete or timed out + SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); int retry = 0; int no_data_retry = 0; int no_data_max_retries = 30; @@ -183,14 +195,16 @@ void get_real_results(SF_STMT * sfstmt) { int max_retries = 7; while (query_status != SF_QUERY_STATUS_SUCCESS) { if (!is_query_still_running(query_status) && query_status != SF_QUERY_STATUS_SUCCESS) { - log_error("Query status is done running and did not succeed. Status is %s", query_status_names[query_status]); + log_error("Query status is done running and did not succeed. Status is %s", + query_status_names[query_status]); return; } if (query_status == SF_QUERY_STATUS_NO_DATA) { no_data_retry++; if (no_data_retry >= no_data_max_retries) { log_error( - "Cannot retrieve data on the status of this query. No information returned from server for queryID=%s", sfstmt->sfqid); + "Cannot retrieve data on the status of this query. No information returned from server for queryID=%s", + sfstmt->sfqid); SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, SF_STATUS_ERROR_GENERAL, "Cannot retrieve data on the status of this query.", @@ -199,21 +213,24 @@ void get_real_results(SF_STMT * sfstmt) { return; } } - } - int sleep_time = retry_pattern[retry] * 500; + + int sleep_time = retry_pattern[retry] * 500; #ifdef _WIN32 - Sleep(sleep_time); + Sleep(sleep_time); #else - usleep(sleep_time * 1000); + usleep(sleep_time * 1000); #endif - if (retry < max_retries) { - retry++; - } else { - log_error( - "Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid); + if (retry < max_retries) { + retry++; + } + else { + log_error( + "Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid); + } + query_status = snowflake_get_query_status(sfstmt); } - query_status = get_query_status(sfstmt->connection, sfstmt->sfqid); + // Get query results char query[1024]; char* query_template = "select * from table(result_scan('%s'))"; sf_sprintf(query, strlen(query_template) - 2 + strlen(sfstmt->sfqid) + 1, query_template, sfstmt->sfqid); @@ -221,6 +238,19 @@ void get_real_results(SF_STMT * sfstmt) { if (ret != SF_STATUS_SUCCESS) { snowflake_propagate_error(sfstmt->connection, sfstmt); } + + // Get query stats + char* metadata_str = get_query_metadata(sfstmt); + if (metadata_str) { + cJSON* metadata = snowflake_cJSON_Parse(metadata_str); + cJSON* stats = snowflake_cJSON_GetObjectItem(metadata, "stats"); + if (snowflake_cJSON_IsObject(stats)) { + if (sfstmt->stats) { + SF_FREE(sfstmt->stats); + } + sfstmt->stats = set_stats(stats); + } + } } #define _SF_STMT_TYPE_DML 0x3000 @@ -1739,7 +1769,7 @@ SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf) { return sfstmt; } -SF_STMT *STDCALL snowflake_async_stmt(SF_CONNECT *sf, const char *query_id) { +SF_STMT *STDCALL snowflake_create_async_query_result(SF_CONNECT *sf, const char *query_id) { if (!sf) { return NULL; } @@ -1749,18 +1779,8 @@ SF_STMT *STDCALL snowflake_async_stmt(SF_CONNECT *sf, const char *query_id) { _snowflake_stmt_reset(sfstmt); sfstmt->connection = sf; sf_strcpy(sfstmt->sfqid, SF_UUID4_LEN, query_id); - } - - get_real_results(sfstmt); - - char *metadata_str = get_query_metadata(sfstmt->connection, query_id); - if (metadata_str) { - cJSON* metadata = snowflake_cJSON_Parse(metadata_str); - cJSON* stats = snowflake_cJSON_GetObjectItem(metadata, "stats"); - if (snowflake_cJSON_IsObject(stats)) { - _snowflake_stmt_row_metadata_reset(sfstmt); - sfstmt->stats = set_stats(stats); - } + sfstmt->is_async = SF_BOOLEAN_TRUE; + sfstmt->is_async_initialized = SF_BOOLEAN_FALSE; } return sfstmt; @@ -1941,6 +1961,11 @@ SF_STATUS STDCALL snowflake_fetch(SF_STMT *sfstmt) { return SF_STATUS_ERROR_STATEMENT_NOT_EXIST; } + if (sfstmt->is_async && !sfstmt->is_async_initialized) { + get_real_results(sfstmt); + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + clear_snowflake_error(&sfstmt->error); SF_STATUS ret = SF_STATUS_ERROR_GENERAL; sf_bool get_chunk_success = SF_BOOLEAN_TRUE; @@ -2634,6 +2659,11 @@ int64 STDCALL snowflake_num_rows(SF_STMT *sfstmt) { return -1; } + if (sfstmt->is_async && !sfstmt->is_async_initialized) { + get_real_results(sfstmt); + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + return sfstmt->total_rowcount; } @@ -2641,6 +2671,12 @@ int64 STDCALL snowflake_num_fields(SF_STMT *sfstmt) { if (!sfstmt) { return -1; } + + if (sfstmt->is_async && !sfstmt->is_async_initialized) { + get_real_results(sfstmt); + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + return sfstmt->total_fieldcount; } @@ -2649,6 +2685,12 @@ uint64 STDCALL snowflake_num_params(SF_STMT *sfstmt) { // TODO change to -1? return 0; } + + if (sfstmt->is_async && !sfstmt->is_async_initialized) { + get_real_results(sfstmt); + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + ARRAY_LIST *p = (ARRAY_LIST *) sfstmt->params; return p->used; } diff --git a/tests/test_async.c b/tests/test_async.c index d43fc66e25..4cbfd999b2 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -79,7 +79,7 @@ void test_new_connection(void** unused) { } assert_int_equal(status, SF_STATUS_SUCCESS); - SF_STMT* async_sfstmt = snowflake_async_stmt(sf, sfqid); + SF_STMT* async_sfstmt = snowflake_create_async_query_result(sf, sfqid); /* get results */ int64 out = 0; @@ -128,7 +128,7 @@ void test_invalid_query_id(void** unused) { assert_int_equal(status, SF_STATUS_SUCCESS); char* fake_sfqid = "fake-query-id"; - SF_STMT* async_sfstmt = snowflake_async_stmt(sf, fake_sfqid); + SF_STMT* async_sfstmt = snowflake_create_async_query_result(sf, fake_sfqid); assert_non_null(async_sfstmt); assert_non_null(async_sfstmt->connection); From 8ab3758b95c9aecf595599cecaa06a773d000ee6 Mon Sep 17 00:00:00 2001 From: norrislee Date: Mon, 2 Dec 2024 17:19:16 -0800 Subject: [PATCH 06/16] Fix typo --- include/snowflake/client.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/snowflake/client.h b/include/snowflake/client.h index 9880538bf1..03828d595b 100644 --- a/include/snowflake/client.h +++ b/include/snowflake/client.h @@ -651,7 +651,7 @@ SF_STMT* STDCALL snowflake_create_async_query_result(SF_CONNECT *sf, const char * * @return The query status. */ -SF_QUERY_STATUS STDCALL snowflake_get_query_status(SF_STMT sfstmt); +SF_QUERY_STATUS STDCALL snowflake_get_query_status(SF_STMT *sfstmt); /** * Frees the memory used by a SF_QUERY_RESULT_CAPTURE struct. From 43c56531d19a81388b99cc4920f49ff50929f2ed Mon Sep 17 00:00:00 2001 From: norrislee Date: Mon, 2 Dec 2024 17:26:50 -0800 Subject: [PATCH 07/16] Fix typo --- lib/client.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/client.c b/lib/client.c index 7ee02bf180..a0654d759e 100644 --- a/lib/client.c +++ b/lib/client.c @@ -138,7 +138,7 @@ char *get_query_metadata(SF_STMT* sfstmt) { } -SF_QUERY_STATUS snowflake_get_query_status(SF_STMT *sfstmt) { +SF_QUERY_STATUS STDCALL snowflake_get_query_status(SF_STMT *sfstmt) { SF_QUERY_STATUS ret = SF_QUERY_STATUS_NO_DATA; char *metadata = get_query_metadata(sfstmt); if (metadata) { From 76ac1f18a5a308f37f803488abf1c2a1fde9fec2 Mon Sep 17 00:00:00 2001 From: norrislee Date: Wed, 4 Dec 2024 09:17:27 -0800 Subject: [PATCH 08/16] Add more test cases, fix async fetching bugs --- include/snowflake/platform.h | 3 + lib/client.c | 217 ++++++++++++++++++----------------- lib/connection.c | 3 +- lib/http_perform.c | 18 +-- lib/platform.c | 9 ++ tests/test_async.c | 106 ++++++++++++++++- 6 files changed, 231 insertions(+), 125 deletions(-) diff --git a/include/snowflake/platform.h b/include/snowflake/platform.h index 807d6904f0..bc75f85843 100755 --- a/include/snowflake/platform.h +++ b/include/snowflake/platform.h @@ -142,6 +142,9 @@ void STDCALL sf_memory_error_handler(); // this should be called by application before any calls of sfclient void STDCALL sf_exception_on_memory_failure(); +void STDCALL sf_sleep_ms(int sleep_ms); + + #ifdef __cplusplus } #endif diff --git a/lib/client.c b/lib/client.c index a0654d759e..41fcf9f8eb 100644 --- a/lib/client.c +++ b/lib/client.c @@ -133,7 +133,7 @@ char *get_query_metadata(SF_STMT* sfstmt) { return metadata; } SF_FREE(status_query); - log_error("Error getting query metadata. Query id: %s", sfstmt->sfqid); + log_info("No query metadata found. Query id: %s", sfstmt->sfqid); return NULL; } @@ -149,22 +149,8 @@ SF_QUERY_STATUS STDCALL snowflake_get_query_status(SF_STMT *sfstmt) { char* queryStatus = snowflake_cJSON_GetStringValue(status); ret = get_status_from_string(queryStatus); } - else { - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_GENERAL, - "Error retrieving the status from the metadata.", - NULL, - sfstmt->sfqid); - } snowflake_cJSON_Delete(metadataJson); } - else { - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_GENERAL, - "Error retrieving query metadata.", - NULL, - sfstmt->sfqid); - } return ret; } @@ -185,7 +171,7 @@ sf_bool is_query_still_running(SF_QUERY_STATUS query_status) { * Get the results of the async query * @param sfstmt The SF_STMT context */ -void get_real_results(SF_STMT *sfstmt) { +sf_bool get_real_results(SF_STMT *sfstmt) { //Get status until query is complete or timed out SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); int retry = 0; @@ -197,7 +183,7 @@ void get_real_results(SF_STMT *sfstmt) { if (!is_query_still_running(query_status) && query_status != SF_QUERY_STATUS_SUCCESS) { log_error("Query status is done running and did not succeed. Status is %s", query_status_names[query_status]); - return; + break; } if (query_status == SF_QUERY_STATUS_NO_DATA) { no_data_retry++; @@ -210,22 +196,19 @@ void get_real_results(SF_STMT *sfstmt) { "Cannot retrieve data on the status of this query.", NULL, sfstmt->sfqid); - return; + break; } } int sleep_time = retry_pattern[retry] * 500; -#ifdef _WIN32 - Sleep(sleep_time); -#else - usleep(sleep_time * 1000); -#endif + sf_sleep_ms(sleep_time); if (retry < max_retries) { retry++; } else { log_error( "Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid); + break; } query_status = snowflake_get_query_status(sfstmt); } @@ -237,6 +220,7 @@ void get_real_results(SF_STMT *sfstmt) { SF_STATUS ret = snowflake_query(sfstmt, query, strlen(query)); if (ret != SF_STATUS_SUCCESS) { snowflake_propagate_error(sfstmt->connection, sfstmt); + return SF_BOOLEAN_FALSE; } // Get query stats @@ -251,6 +235,7 @@ void get_real_results(SF_STMT *sfstmt) { sfstmt->stats = set_stats(stats); } } + return SF_BOOLEAN_TRUE; } #define _SF_STMT_TYPE_DML 0x3000 @@ -1962,8 +1947,12 @@ SF_STATUS STDCALL snowflake_fetch(SF_STMT *sfstmt) { } if (sfstmt->is_async && !sfstmt->is_async_initialized) { - get_real_results(sfstmt); - sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return SF_STATUS_ERROR_GENERAL; + } } clear_snowflake_error(&sfstmt->error); @@ -2177,6 +2166,9 @@ SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt) { } SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt) { + if (!sfstmt->is_async) { + sfstmt->is_async = SF_BOOLEAN_TRUE; + } return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_TRUE); } @@ -2463,124 +2455,127 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, // Determine query result format and detach rowset object from data. cJSON * qrf = snowflake_cJSON_GetObjectItem(data, "queryResultFormat"); - char * qrf_str = snowflake_cJSON_GetStringValue(qrf); - sfstmt->qrf = SF_CALLOC(1, sizeof(QueryResultFormat_t)); - cJSON * rowset = NULL; + if (qrf) { + char* qrf_str = snowflake_cJSON_GetStringValue(qrf); + sfstmt->qrf = SF_CALLOC(1, sizeof(QueryResultFormat_t)); + cJSON* rowset = NULL; - if (strcmp(qrf_str, "arrow") == 0 || strcmp(qrf_str, "arrow_force") == 0) { + if (strcmp(qrf_str, "arrow") == 0 || strcmp(qrf_str, "arrow_force") == 0) { #ifdef SF_WIN32 SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT, - "Query results were fetched using Arrow, " - "but the client library does not yet support decoding Arrow results", "", - sfstmt->sfqid); + "Query results were fetched using Arrow, " + "but the client library does not yet support decoding Arrow results", "", + sfstmt->sfqid); return SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT; #endif - *((QueryResultFormat_t *) sfstmt->qrf) = ARROW_FORMAT; + * ((QueryResultFormat_t*)sfstmt->qrf) = ARROW_FORMAT; rowset = snowflake_cJSON_DetachItemFromObject(data, "rowsetBase64"); if (!rowset) { - log_error("No valid rowset found in response"); - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_BAD_JSON, - "Missing rowset from response. No results found.", - SF_SQLSTATE_APP_REJECT_CONNECTION, - sfstmt->sfqid); - goto cleanup; + log_error("No valid rowset found in response"); + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_BAD_JSON, + "Missing rowset from response. No results found.", + SF_SQLSTATE_APP_REJECT_CONNECTION, + sfstmt->sfqid); + goto cleanup; } - } - else if (strcmp(qrf_str, "json") == 0) { - *((QueryResultFormat_t *) sfstmt->qrf) = JSON_FORMAT; - if (json_detach_array_from_object((cJSON **)(&rowset), data, "rowset")) + } + else if (strcmp(qrf_str, "json") == 0) { + *((QueryResultFormat_t*)sfstmt->qrf) = JSON_FORMAT; + if (json_detach_array_from_object((cJSON**)(&rowset), data, "rowset")) { - log_error("No valid rowset found in response"); - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_BAD_JSON, - "Missing rowset from response. No results found.", - SF_SQLSTATE_APP_REJECT_CONNECTION, - sfstmt->sfqid); - goto cleanup; + log_error("No valid rowset found in response"); + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_BAD_JSON, + "Missing rowset from response. No results found.", + SF_SQLSTATE_APP_REJECT_CONNECTION, + sfstmt->sfqid); + goto cleanup; } - } - else { + } + else { log_error("Unsupported query result format: %s", qrf_str); - } + } - // Index starts at 0 and incremented each fetch - sfstmt->total_row_index = 0; + // Index starts at 0 and incremented each fetch + sfstmt->total_row_index = 0; - // When the result set is sufficient large, the server response will contain - // an empty "rowset" object. Instead, it will have a "chunks" object that contains, - // among other fields, a URL from which the result set can be downloaded in chunks. - // In this case, we initialize the chunk downloader, which will download in the - // background as calls to snowflake_fetch() are made. - if ((chunks = snowflake_cJSON_GetObjectItem(data, "chunks")) != NULL) { + // When the result set is sufficient large, the server response will contain + // an empty "rowset" object. Instead, it will have a "chunks" object that contains, + // among other fields, a URL from which the result set can be downloaded in chunks. + // In this case, we initialize the chunk downloader, which will download in the + // background as calls to snowflake_fetch() are made. + if ((chunks = snowflake_cJSON_GetObjectItem(data, "chunks")) != NULL) { // We don't care if there is no qrmk, so ignore return code json_copy_string(&qrmk, data, "qrmk"); chunk_headers = snowflake_cJSON_GetObjectItem(data, "chunkHeaders"); NON_JSON_RESP* (*callback_create_resp)(void) = NULL; - if (ARROW_FORMAT == *((QueryResultFormat_t *)sfstmt->qrf)) { - callback_create_resp = callback_create_arrow_resp; + if (ARROW_FORMAT == *((QueryResultFormat_t*)sfstmt->qrf)) { + callback_create_resp = callback_create_arrow_resp; } sfstmt->chunk_downloader = chunk_downloader_init( - qrmk, - chunk_headers, - chunks, - 2, // thread count - 4, // fetch slot - &sfstmt->error, - sfstmt->connection->insecure_mode, - sfstmt->connection->ocsp_fail_open, - callback_create_resp, - sfstmt->connection->proxy, - sfstmt->connection->no_proxy, - get_retry_timeout(sfstmt->connection), - sfstmt->connection->retry_count); + qrmk, + chunk_headers, + chunks, + 2, // thread count + 4, // fetch slot + &sfstmt->error, + sfstmt->connection->insecure_mode, + sfstmt->connection->ocsp_fail_open, + callback_create_resp, + sfstmt->connection->proxy, + sfstmt->connection->no_proxy, + get_retry_timeout(sfstmt->connection), + sfstmt->connection->retry_count); if (!sfstmt->chunk_downloader) { - // Unable to create chunk downloader. - // Error is set in chunk_downloader_init function. - goto cleanup; + // Unable to create chunk downloader. + // Error is set in chunk_downloader_init function. + goto cleanup; } // Even when the result set is split into chunks, JSON format will still // response with the first chunk in "rowset", so be sure to include it. sfstmt->result_set = rs_create_with_json_result( - rowset, - sfstmt->desc, - (QueryResultFormat_t *)sfstmt->qrf, - sfstmt->connection->timezone); + rowset, + sfstmt->desc, + (QueryResultFormat_t*)sfstmt->qrf, + sfstmt->connection->timezone); // Update chunk row count. Controls the chunk downloader. sfstmt->chunk_rowcount = rs_get_row_count_in_chunk( - sfstmt->result_set, - (QueryResultFormat_t *) sfstmt->qrf); + sfstmt->result_set, + (QueryResultFormat_t*)sfstmt->qrf); // Update total row count. Used in snowflake_num_rows(). if (json_copy_int(&sfstmt->total_rowcount, data, "total")) { - log_warn( - "No total count found in response. Reverting to using array size of results"); - sfstmt->total_rowcount = sfstmt->chunk_rowcount; + log_warn( + "No total count found in response. Reverting to using array size of results"); + sfstmt->total_rowcount = sfstmt->chunk_rowcount; } - } else { + } + else { // Create a result set object and update the total rowcount. sfstmt->result_set = rs_create_with_json_result( - rowset, - sfstmt->desc, - (QueryResultFormat_t *) sfstmt->qrf, - sfstmt->connection->timezone); + rowset, + sfstmt->desc, + (QueryResultFormat_t*)sfstmt->qrf, + sfstmt->connection->timezone); // Update chunk row count. Controls the chunk downloader. sfstmt->chunk_rowcount = rs_get_row_count_in_chunk( - sfstmt->result_set, - (QueryResultFormat_t *) sfstmt->qrf); + sfstmt->result_set, + (QueryResultFormat_t*)sfstmt->qrf); // Update total row count. Used in snowflake_num_rows(). if (json_copy_int(&sfstmt->total_rowcount, data, "total")) { - log_warn( - "No total count found in response. Reverting to using array size of results"); - sfstmt->total_rowcount = sfstmt->chunk_rowcount; + log_warn( + "No total count found in response. Reverting to using array size of results"); + sfstmt->total_rowcount = sfstmt->chunk_rowcount; } + } } } } else if (json_error != SF_JSON_ERROR_NONE) { @@ -2660,8 +2655,12 @@ int64 STDCALL snowflake_num_rows(SF_STMT *sfstmt) { } if (sfstmt->is_async && !sfstmt->is_async_initialized) { - get_real_results(sfstmt); - sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return -1; + } } return sfstmt->total_rowcount; @@ -2673,8 +2672,12 @@ int64 STDCALL snowflake_num_fields(SF_STMT *sfstmt) { } if (sfstmt->is_async && !sfstmt->is_async_initialized) { - get_real_results(sfstmt); - sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return -1; + } } return sfstmt->total_fieldcount; @@ -2687,8 +2690,12 @@ uint64 STDCALL snowflake_num_params(SF_STMT *sfstmt) { } if (sfstmt->is_async && !sfstmt->is_async_initialized) { - get_real_results(sfstmt); - sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return 0; + } } ARRAY_LIST *p = (ARRAY_LIST *) sfstmt->params; diff --git a/lib/connection.c b/lib/connection.c index d6588eb655..1b728230ae 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -413,8 +413,7 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf, break; } - while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0 || - strcmp(query_code, QUERY_IN_PROGRESS_ASYNC_CODE) == 0) { + while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0) { // Remove old result URL and query code if this isn't our first rodeo SF_FREE(result_url); memset(query_code, 0, QUERYCODE_LEN); diff --git a/lib/http_perform.c b/lib/http_perform.c index b7291e2034..6bf671e349 100644 --- a/lib/http_perform.c +++ b/lib/http_perform.c @@ -36,8 +36,6 @@ dump(const char *text, FILE *stream, unsigned char *ptr, size_t size, static int my_trace(CURL *handle, curl_infotype type, char *data, size_t size, void *userp); -static void my_sleep_ms(uint32 sleepMs); - static void dump(const char *text, FILE *stream, unsigned char *ptr, size_t size, @@ -128,16 +126,6 @@ int my_trace(CURL *handle, curl_infotype type, return 0; } -static -void my_sleep_ms(uint32 sleepMs) -{ -#ifdef _WIN32 - Sleep(sleepMs); -#else - usleep(sleepMs * 1000); // usleep takes sleep time in us (1 millionth of a second) -#endif -} - sf_bool STDCALL http_perform(CURL *curl, SF_REQUEST_TYPE request_type, char *url, @@ -371,7 +359,7 @@ sf_bool STDCALL http_perform(CURL *curl, if ((renew_injection) && (renew_timeout > 0) && elapsed_time && (*elapsed_time <= 0)) { - my_sleep_ms(renew_timeout * 1000); + sf_sleep_ms(renew_timeout * 1000); res = CURLE_OPERATION_TIMEDOUT; } @@ -387,7 +375,7 @@ sf_bool STDCALL http_perform(CURL *curl, "will retry after %d second", curl_retry_ctx.retry_count, next_sleep_in_secs); - my_sleep_ms(next_sleep_in_secs*1000); + sf_sleep_ms(next_sleep_in_secs*1000); } else if ((res == CURLE_OPERATION_TIMEDOUT) && (renew_timeout > 0)) { retry = SF_BOOLEAN_TRUE; } else { @@ -436,7 +424,7 @@ sf_bool STDCALL http_perform(CURL *curl, "will retry after %d seconds", http_code, curl_retry_ctx.retry_count, next_sleep_in_secs); - my_sleep_ms(next_sleep_in_secs * 1000); + sf_sleep_ms(next_sleep_in_secs * 1000); } else { char msg[1024]; diff --git a/lib/platform.c b/lib/platform.c index baf5a0ead3..8c66157016 100755 --- a/lib/platform.c +++ b/lib/platform.c @@ -236,6 +236,15 @@ struct tm* sfchrono_localtime(const time_t *timep, struct tm *tm) } #endif +void STDCALL sf_sleep_ms(int sleep_ms) +{ +#ifdef _WIN32 + Sleep(sleep_ms); +#else + usleep(sleepMs * 1000); // usleep takes sleep time in us (1 millionth of a second) +#endif +} + struct tm *STDCALL sf_gmtime(const time_t *timep, struct tm *result) { #ifdef _WIN32 return sfchrono_gmtime(timep, result); diff --git a/tests/test_async.c b/tests/test_async.c index 4cbfd999b2..11eb7652d6 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -44,6 +44,94 @@ void test_select(void **unused) { snowflake_term(sf); } +/** + * Test normal getting query status + */ +void test_query_status(void** unused) { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select system$wait(5);", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); + assert_int_equal(query_status, SF_QUERY_STATUS_RUNNING); + + int retries = 0; + while (query_status != SF_QUERY_STATUS_SUCCESS || retries > 5) { + query_status = snowflake_get_query_status(sfstmt); + sf_sleep_ms(2000); + retries++; + } + + /* get results */ + char *out = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 1); + + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_str(sfstmt, 1, &out, &value_len, &max_value_size); + assert_string_equal(out, "waited 5 seconds"); + } + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); +} + +/** + * Test premature fetch + */ +void test_premature_fetch(void** unused) { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select system$wait(5);", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* get results */ + char* out = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 1); + + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_str(sfstmt, 1, &out, &value_len, &max_value_size); + assert_string_equal(out, "waited 5 seconds"); + } + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); +} + /** * Test async with new connection */ @@ -85,11 +173,9 @@ void test_new_connection(void** unused) { int64 out = 0; assert_int_equal(snowflake_num_rows(async_sfstmt), 1); - int counter = 0; while ((status = snowflake_fetch(async_sfstmt)) == SF_STATUS_SUCCESS) { snowflake_column_as_int64(async_sfstmt, 1, &out); assert_int_equal(out, 1); - ++counter; } if (status != SF_STATUS_EOF) { dump_error(&(async_sfstmt->error)); @@ -113,11 +199,23 @@ void test_fake_table(void** unused) { status = snowflake_prepare(sfstmt, "select * from fake_table;", 0); assert_int_equal(status, SF_STATUS_SUCCESS); status = snowflake_async_execute(sfstmt); + assert_int_equal(status, SF_STATUS_SUCCESS); + + SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); + assert_int_equal(query_status, SF_QUERY_STATUS_FAILED_WITH_ERROR); + + /* get results */ + char* out = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + status = snowflake_fetch(sfstmt); assert_int_equal(status, SF_STATUS_ERROR_GENERAL); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); } /** - * Test async query with fake table + * Test async query with invalid query id */ void test_invalid_query_id(void** unused) { SF_CONNECT* sf = setup_snowflake_connection(); @@ -140,6 +238,8 @@ int main(void) { initialize_test(SF_BOOLEAN_FALSE); const struct CMUnitTest tests[] = { cmocka_unit_test(test_select), + cmocka_unit_test(test_query_status), + cmocka_unit_test(test_premature_fetch), cmocka_unit_test(test_new_connection), cmocka_unit_test(test_fake_table), cmocka_unit_test(test_invalid_query_id), From c4fa76da08496351af3120514f0464b8bf10f289 Mon Sep 17 00:00:00 2001 From: norrislee Date: Wed, 4 Dec 2024 13:09:10 -0800 Subject: [PATCH 09/16] Fix bug with normal queries that go async after a while --- lib/connection.c | 46 ++++++++++++++++++++++++++++------------------ tests/test_async.c | 2 +- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/lib/connection.c b/lib/connection.c index 1b728230ae..c4117dbc9e 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -413,40 +413,50 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf, break; } - while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0) { + sf_bool isAsyncExec = SF_BOOLEAN_FALSE; + cJSON *json_body = snowflake_cJSON_Parse(body); + cJSON *async = snowflake_cJSON_GetObjectItem(json_body, "asyncExec"); + if (async && snowflake_cJSON_IsBool(async)) { + isAsyncExec = snowflake_cJSON_IsTrue(async); + } + + if (!isAsyncExec) { + while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0 || + strcmp(query_code, QUERY_IN_PROGRESS_ASYNC_CODE) == 0) { // Remove old result URL and query code if this isn't our first rodeo SF_FREE(result_url); memset(query_code, 0, QUERYCODE_LEN); data = snowflake_cJSON_GetObjectItem(*json, "data"); if (json_copy_string(&result_url, data, "getResultUrl") != - SF_JSON_ERROR_NONE) { - stop = SF_BOOLEAN_TRUE; - JSON_ERROR_MSG(json_error, error_msg, "Result URL"); - SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_BAD_JSON, error_msg, - SF_SQLSTATE_UNABLE_TO_CONNECT); - break; + SF_JSON_ERROR_NONE) { + stop = SF_BOOLEAN_TRUE; + JSON_ERROR_MSG(json_error, error_msg, "Result URL"); + SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_BAD_JSON, error_msg, + SF_SQLSTATE_UNABLE_TO_CONNECT); + break; } log_trace("ping pong starting..."); if (!request(sf, json, result_url, NULL, 0, NULL, header, - GET_REQUEST_TYPE, error, SF_BOOLEAN_FALSE, - 0, retry_max_count, retry_timeout, NULL, NULL, NULL, SF_BOOLEAN_FALSE)) { - // Error came from request up, just break - stop = SF_BOOLEAN_TRUE; - break; + GET_REQUEST_TYPE, error, SF_BOOLEAN_FALSE, + 0, retry_max_count, retry_timeout, NULL, NULL, NULL, SF_BOOLEAN_FALSE)) { + // Error came from request up, just break + stop = SF_BOOLEAN_TRUE; + break; } if ( (json_error = json_copy_string_no_alloc(query_code, *json, "code", - QUERYCODE_LEN)) != + QUERYCODE_LEN)) != SF_JSON_ERROR_NONE && json_error != SF_JSON_ERROR_ITEM_NULL) { - stop = SF_BOOLEAN_TRUE; - JSON_ERROR_MSG(json_error, error_msg, "Query code"); - SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_BAD_JSON, error_msg, - SF_SQLSTATE_UNABLE_TO_CONNECT); - break; + stop = SF_BOOLEAN_TRUE; + JSON_ERROR_MSG(json_error, error_msg, "Query code"); + SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_BAD_JSON, error_msg, + SF_SQLSTATE_UNABLE_TO_CONNECT); + break; } + } } if (stop) { diff --git a/tests/test_async.c b/tests/test_async.c index 11eb7652d6..a49af28888 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -196,7 +196,7 @@ void test_fake_table(void** unused) { /* query */ SF_STMT* sfstmt = snowflake_stmt(sf); - status = snowflake_prepare(sfstmt, "select * from fake_table;", 0); + status = snowflake_prepare(sfstmt, "select * from my_fake_table;", 0); assert_int_equal(status, SF_STATUS_SUCCESS); status = snowflake_async_execute(sfstmt); assert_int_equal(status, SF_STATUS_SUCCESS); From 387a59fac1358fef06c6f41103841ad64b7aae1f Mon Sep 17 00:00:00 2001 From: norrislee Date: Wed, 4 Dec 2024 13:13:16 -0800 Subject: [PATCH 10/16] fix typo --- lib/platform.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/platform.c b/lib/platform.c index 8c66157016..19543be520 100755 --- a/lib/platform.c +++ b/lib/platform.c @@ -241,7 +241,7 @@ void STDCALL sf_sleep_ms(int sleep_ms) #ifdef _WIN32 Sleep(sleep_ms); #else - usleep(sleepMs * 1000); // usleep takes sleep time in us (1 millionth of a second) + usleep(sleep_ms * 1000); // usleep takes sleep time in us (1 millionth of a second) #endif } From cb067ec4bcb4928385af98f541f583bfc94bfcb0 Mon Sep 17 00:00:00 2001 From: norrislee Date: Wed, 4 Dec 2024 13:48:17 -0800 Subject: [PATCH 11/16] Remove status check from fake table --- tests/test_async.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_async.c b/tests/test_async.c index a49af28888..5b03a37bc5 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -201,9 +201,6 @@ void test_fake_table(void** unused) { status = snowflake_async_execute(sfstmt); assert_int_equal(status, SF_STATUS_SUCCESS); - SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); - assert_int_equal(query_status, SF_QUERY_STATUS_FAILED_WITH_ERROR); - /* get results */ char* out = NULL; size_t value_len = 0; From 582913e7949082ec60d97e477710b70b411c37b2 Mon Sep 17 00:00:00 2001 From: norrislee Date: Tue, 10 Dec 2024 12:44:31 -0800 Subject: [PATCH 12/16] Fix linux warnings --- lib/client.c | 1 - tests/test_async.c | 15 ++++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/lib/client.c b/lib/client.c index e79bae647f..e05d30a2e7 100644 --- a/lib/client.c +++ b/lib/client.c @@ -108,7 +108,6 @@ char *get_query_metadata(SF_STMT* sfstmt) { cJSON *data = NULL; cJSON *queries = NULL; char *s_resp = NULL; - const char *error_msg; size_t url_size = strlen(QUERY_MONITOR_URL) -2 + strlen(sfstmt->sfqid) + 1; char *status_query = (char*)SF_CALLOC(1, url_size); sf_sprintf(status_query, url_size, QUERY_MONITOR_URL, sfstmt->sfqid); diff --git a/tests/test_async.c b/tests/test_async.c index 5b03a37bc5..af1651204d 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -8,7 +8,7 @@ /** * Test normal query flow with async */ -void test_select(void **unused) { +void test_select() { SF_CONNECT* sf = setup_snowflake_connection(); SF_STATUS status = snowflake_connect(sf); if (status != SF_STATUS_SUCCESS) { @@ -47,7 +47,7 @@ void test_select(void **unused) { /** * Test normal getting query status */ -void test_query_status(void** unused) { +void test_query_status() { SF_CONNECT* sf = setup_snowflake_connection(); SF_STATUS status = snowflake_connect(sf); if (status != SF_STATUS_SUCCESS) { @@ -96,7 +96,7 @@ void test_query_status(void** unused) { /** * Test premature fetch */ -void test_premature_fetch(void** unused) { +void test_premature_fetch() { SF_CONNECT* sf = setup_snowflake_connection(); SF_STATUS status = snowflake_connect(sf); if (status != SF_STATUS_SUCCESS) { @@ -135,7 +135,7 @@ void test_premature_fetch(void** unused) { /** * Test async with new connection */ -void test_new_connection(void** unused) { +void test_new_connection() { SF_CONNECT* sf = setup_snowflake_connection(); SF_STATUS status = snowflake_connect(sf); if (status != SF_STATUS_SUCCESS) { @@ -186,7 +186,7 @@ void test_new_connection(void** unused) { /** * Test async query with fake table */ -void test_fake_table(void** unused) { +void test_fake_table() { SF_CONNECT* sf = setup_snowflake_connection(); SF_STATUS status = snowflake_connect(sf); if (status != SF_STATUS_SUCCESS) { @@ -202,9 +202,6 @@ void test_fake_table(void** unused) { assert_int_equal(status, SF_STATUS_SUCCESS); /* get results */ - char* out = NULL; - size_t value_len = 0; - size_t max_value_size = 0; status = snowflake_fetch(sfstmt); assert_int_equal(status, SF_STATUS_ERROR_GENERAL); snowflake_stmt_term(sfstmt); @@ -214,7 +211,7 @@ void test_fake_table(void** unused) { /** * Test async query with invalid query id */ -void test_invalid_query_id(void** unused) { +void test_invalid_query_id() { SF_CONNECT* sf = setup_snowflake_connection(); SF_STATUS status = snowflake_connect(sf); if (status != SF_STATUS_SUCCESS) { From f84fc2571ef88465b206fd7c5a9917ba9fa1e20c Mon Sep 17 00:00:00 2001 From: norrislee Date: Thu, 12 Dec 2024 14:00:01 -0800 Subject: [PATCH 13/16] Improve error handling and logging --- lib/client.c | 22 ++++++++++++---------- lib/connection.c | 42 ++++++++++++++++++++++-------------------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/lib/client.c b/lib/client.c index e05d30a2e7..84142dfa77 100644 --- a/lib/client.c +++ b/lib/client.c @@ -226,13 +226,19 @@ sf_bool get_real_results(SF_STMT *sfstmt) { char* metadata_str = get_query_metadata(sfstmt); if (metadata_str) { cJSON* metadata = snowflake_cJSON_Parse(metadata_str); - cJSON* stats = snowflake_cJSON_GetObjectItem(metadata, "stats"); - if (snowflake_cJSON_IsObject(stats)) { - if (sfstmt->stats) { - SF_FREE(sfstmt->stats); + if (metadata && snowflake_cJSON_IsObject(metadata)) { + cJSON* stats = snowflake_cJSON_GetObjectItem(metadata, "stats"); + if (snowflake_cJSON_IsObject(stats)) { + if (sfstmt->stats) { + SF_FREE(sfstmt->stats); + } + sfstmt->stats = set_stats(stats); } - sfstmt->stats = set_stats(stats); + log_error( + "Error parsing query stats from query id: %s", sfstmt->sfqid); } + log_error( + "Error parsing query metadata from query id: %s", sfstmt->sfqid); } return SF_BOOLEAN_TRUE; } @@ -2285,11 +2291,7 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, is_string_empty(sfstmt->connection->directURL) ? NULL : sfstmt->request_id, is_describe_only); - if (is_async_exec) { - snowflake_cJSON_AddBoolToObject(body, "asyncExec", SF_BOOLEAN_TRUE); - } else { - snowflake_cJSON_AddBoolToObject(body, "asyncExec", SF_BOOLEAN_FALSE); - } + snowflake_cJSON_AddBoolToObject(body, "asyncExec", is_async_exec); if (bindings != NULL) { /* binding parameters if exists */ diff --git a/lib/connection.c b/lib/connection.c index c4117dbc9e..b5bf99ef89 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -415,9 +415,11 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf, sf_bool isAsyncExec = SF_BOOLEAN_FALSE; cJSON *json_body = snowflake_cJSON_Parse(body); - cJSON *async = snowflake_cJSON_GetObjectItem(json_body, "asyncExec"); - if (async && snowflake_cJSON_IsBool(async)) { - isAsyncExec = snowflake_cJSON_IsTrue(async); + if (json_body && snowflake_cJSON_IsObject(json_body)) { + cJSON* async = snowflake_cJSON_GetObjectItem(json_body, "asyncExec"); + if (async && snowflake_cJSON_IsBool(async)) { + isAsyncExec = snowflake_cJSON_IsTrue(async); + } } if (!isAsyncExec) { @@ -428,33 +430,33 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf, memset(query_code, 0, QUERYCODE_LEN); data = snowflake_cJSON_GetObjectItem(*json, "data"); if (json_copy_string(&result_url, data, "getResultUrl") != - SF_JSON_ERROR_NONE) { - stop = SF_BOOLEAN_TRUE; - JSON_ERROR_MSG(json_error, error_msg, "Result URL"); - SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_BAD_JSON, error_msg, - SF_SQLSTATE_UNABLE_TO_CONNECT); - break; + SF_JSON_ERROR_NONE) { + stop = SF_BOOLEAN_TRUE; + JSON_ERROR_MSG(json_error, error_msg, "Result URL"); + SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_BAD_JSON, error_msg, + SF_SQLSTATE_UNABLE_TO_CONNECT); + break; } log_trace("ping pong starting..."); if (!request(sf, json, result_url, NULL, 0, NULL, header, - GET_REQUEST_TYPE, error, SF_BOOLEAN_FALSE, - 0, retry_max_count, retry_timeout, NULL, NULL, NULL, SF_BOOLEAN_FALSE)) { - // Error came from request up, just break - stop = SF_BOOLEAN_TRUE; - break; + GET_REQUEST_TYPE, error, SF_BOOLEAN_FALSE, + 0, retry_max_count, retry_timeout, NULL, NULL, NULL, SF_BOOLEAN_FALSE)) { + // Error came from request up, just break + stop = SF_BOOLEAN_TRUE; + break; } if ( (json_error = json_copy_string_no_alloc(query_code, *json, "code", - QUERYCODE_LEN)) != + QUERYCODE_LEN)) != SF_JSON_ERROR_NONE && json_error != SF_JSON_ERROR_ITEM_NULL) { - stop = SF_BOOLEAN_TRUE; - JSON_ERROR_MSG(json_error, error_msg, "Query code"); - SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_BAD_JSON, error_msg, - SF_SQLSTATE_UNABLE_TO_CONNECT); - break; + stop = SF_BOOLEAN_TRUE; + JSON_ERROR_MSG(json_error, error_msg, "Query code"); + SET_SNOWFLAKE_ERROR(error, SF_STATUS_ERROR_BAD_JSON, error_msg, + SF_SQLSTATE_UNABLE_TO_CONNECT); + break; } } } From 942ae0a0dff4564cb2035af797a9842042ac91b2 Mon Sep 17 00:00:00 2001 From: norrislee Date: Thu, 19 Dec 2024 14:15:26 -0800 Subject: [PATCH 14/16] Fix memory issues in test cases --- tests/test_async.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_async.c b/tests/test_async.c index af1651204d..a9596ab171 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -30,11 +30,9 @@ void test_select() { int64 out = 0; assert_int_equal(snowflake_num_rows(sfstmt), 1); - int counter = 0; while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { snowflake_column_as_int64(sfstmt, 1, &out); assert_int_equal(out, 1); - ++counter; } if (status != SF_STATUS_EOF) { dump_error(&(sfstmt->error)); @@ -91,6 +89,7 @@ void test_query_status() { assert_int_equal(status, SF_STATUS_EOF); snowflake_stmt_term(sfstmt); snowflake_term(sf); + SF_FREE(out); } /** @@ -130,6 +129,7 @@ void test_premature_fetch() { assert_int_equal(status, SF_STATUS_EOF); snowflake_stmt_term(sfstmt); snowflake_term(sf); + SF_FREE(out); } /** @@ -168,6 +168,7 @@ void test_new_connection() { assert_int_equal(status, SF_STATUS_SUCCESS); SF_STMT* async_sfstmt = snowflake_create_async_query_result(sf, sfqid); + SF_FREE(sfqid); /* get results */ int64 out = 0; From 75e6565a2faf6434f439e5104efd98c2e8135762 Mon Sep 17 00:00:00 2001 From: norrislee Date: Fri, 20 Dec 2024 16:41:01 -0800 Subject: [PATCH 15/16] organize enums, add test --- include/snowflake/client.h | 17 ++++++++-------- include/snowflake/platform.h | 2 +- lib/client.c | 23 +++++++++++----------- lib/platform.c | 2 +- tests/test_async.c | 38 ++++++++++++++++++++++++++++++++++++ 5 files changed, 61 insertions(+), 21 deletions(-) diff --git a/include/snowflake/client.h b/include/snowflake/client.h index 03828d595b..d1c091937d 100644 --- a/include/snowflake/client.h +++ b/include/snowflake/client.h @@ -298,19 +298,20 @@ typedef enum SF_STMT_ATTRIBUTE { * The query status */ typedef enum SF_QUERY_STATUS { - SF_QUERY_STATUS_RUNNING, + SF_QUERY_STATUS_ABORTED, SF_QUERY_STATUS_ABORTING, - SF_QUERY_STATUS_SUCCESS, + SF_QUERY_STATUS_BLOCKED, + SF_QUERY_STATUS_DISCONNECTED, SF_QUERY_STATUS_FAILED_WITH_ERROR, - SF_QUERY_STATUS_ABORTED, - SF_QUERY_STATUS_QUEUED, SF_QUERY_STATUS_FAILED_WITH_INCIDENT, - SF_QUERY_STATUS_DISCONNECTED, - SF_QUERY_STATUS_RESUMING_WAREHOUSE, + SF_QUERY_STATUS_NO_DATA, + SF_QUERY_STATUS_RUNNING, + SF_QUERY_STATUS_QUEUED, SF_QUERY_STATUS_QUEUED_REPAIRING_WAREHOUSE, SF_QUERY_STATUS_RESTARTED, - SF_QUERY_STATUS_BLOCKED, - SF_QUERY_STATUS_NO_DATA + SF_QUERY_STATUS_RESUMING_WAREHOUSE, + SF_QUERY_STATUS_SUCCESS, + SF_QUERY_STATUS_UNKNOWN } SF_QUERY_STATUS; /** diff --git a/include/snowflake/platform.h b/include/snowflake/platform.h index bc75f85843..b5d938c62d 100755 --- a/include/snowflake/platform.h +++ b/include/snowflake/platform.h @@ -142,7 +142,7 @@ void STDCALL sf_memory_error_handler(); // this should be called by application before any calls of sfclient void STDCALL sf_exception_on_memory_failure(); -void STDCALL sf_sleep_ms(int sleep_ms); +void sf_sleep_ms(int sleep_ms); #ifdef __cplusplus diff --git a/lib/client.c b/lib/client.c index 6448d4246d..1214173a81 100644 --- a/lib/client.c +++ b/lib/client.c @@ -57,19 +57,20 @@ _reset_connection_parameters(SF_CONNECT *sf, cJSON *parameters, cJSON *session_info, sf_bool do_validate); static const char* query_status_names[] = { - "RUNNING", + "ABORTED", "ABORTING", - "SUCCESS", + "BLOCKED", + "DISCONNECTED", "FAILED_WITH_ERROR", - "ABORTED", - "QUEUED", "FAILED_WITH_INCIDENT", - "DISCONNECTED", - "RESUMING_WAREHOUSE", + "NO_DATA", + "RUNNING", + "QUEUED", "QUEUED_REPAIRING_WAREHOUSE", "RESTARTED", - "BLOCKED", - "NO_DATA" + "RESUMING_WAREHOUSE", + "SUCCESS", + "UNKNOWN" }; /** @@ -84,10 +85,10 @@ sf_bool validate_application(const char *application); */ SF_QUERY_STATUS get_status_from_string(const char *query_status) { if (query_status == NULL) { - return SF_QUERY_STATUS_NO_DATA; + return SF_QUERY_STATUS_UNKNOWN; } int idx = 0, last = 0; - for (idx = 0, last = (int)SF_QUERY_STATUS_NO_DATA; idx <= last; ++idx) { + for (idx = 0, last = (int)SF_QUERY_STATUS_UNKNOWN; idx <= last; ++idx) { size_t len = strlen(query_status_names[idx]); if (sf_strncasecmp(query_status_names[idx], query_status, len) == 0) { return (SF_QUERY_STATUS)idx; @@ -108,7 +109,7 @@ char *get_query_metadata(SF_STMT* sfstmt) { cJSON *data = NULL; cJSON *queries = NULL; char *s_resp = NULL; - size_t url_size = strlen(QUERY_MONITOR_URL) -2 + strlen(sfstmt->sfqid) + 1; + size_t url_size = strlen(QUERY_MONITOR_URL) - 2 + strlen(sfstmt->sfqid) + 1; char *status_query = (char*)SF_CALLOC(1, url_size); sf_sprintf(status_query, url_size, QUERY_MONITOR_URL, sfstmt->sfqid); diff --git a/lib/platform.c b/lib/platform.c index 19543be520..de76c49975 100755 --- a/lib/platform.c +++ b/lib/platform.c @@ -236,7 +236,7 @@ struct tm* sfchrono_localtime(const time_t *timep, struct tm *tm) } #endif -void STDCALL sf_sleep_ms(int sleep_ms) +void sf_sleep_ms(int sleep_ms) { #ifdef _WIN32 Sleep(sleep_ms); diff --git a/tests/test_async.c b/tests/test_async.c index a9596ab171..e77edeb282 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -229,6 +229,43 @@ void test_invalid_query_id() { assert_null(async_sfstmt->result_set); } +void test_multiple_chunk() { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select randstr(100,random()) from table(generator(rowcount=>10000))", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* get results */ + char* value = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 10000); + + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_str(sfstmt, 1, &value, &value_len, &max_value_size); + assert_int_equal(strlen(value), 100); + } + SF_FREE(value); + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); +} + int main(void) { initialize_test(SF_BOOLEAN_FALSE); const struct CMUnitTest tests[] = { @@ -238,6 +275,7 @@ int main(void) { cmocka_unit_test(test_new_connection), cmocka_unit_test(test_fake_table), cmocka_unit_test(test_invalid_query_id), + cmocka_unit_test(test_multiple_chunk), }; int ret = cmocka_run_group_tests(tests, NULL, NULL); snowflake_global_term(); From 9e457ae115c217a9ebb435fbae1f7569c2af9e34 Mon Sep 17 00:00:00 2001 From: norrislee Date: Fri, 20 Dec 2024 17:24:51 -0800 Subject: [PATCH 16/16] Lower the rowcount for the test --- tests/test_async.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_async.c b/tests/test_async.c index e77edeb282..fd20be1d84 100644 --- a/tests/test_async.c +++ b/tests/test_async.c @@ -239,7 +239,7 @@ void test_multiple_chunk() { /* query */ SF_STMT* sfstmt = snowflake_stmt(sf); - status = snowflake_prepare(sfstmt, "select randstr(100,random()) from table(generator(rowcount=>10000))", 0); + status = snowflake_prepare(sfstmt, "select randstr(100,random()) from table(generator(rowcount=>2000))", 0); assert_int_equal(status, SF_STATUS_SUCCESS); status = snowflake_async_execute(sfstmt); if (status != SF_STATUS_SUCCESS) { @@ -251,7 +251,7 @@ void test_multiple_chunk() { char* value = NULL; size_t value_len = 0; size_t max_value_size = 0; - assert_int_equal(snowflake_num_rows(sfstmt), 10000); + assert_int_equal(snowflake_num_rows(sfstmt), 2000); while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { snowflake_column_as_str(sfstmt, 1, &value, &value_len, &max_value_size);