Skip to content

Commit

Permalink
SNOW-1732752: Prepare version 2.0.0 (#757)
Browse files Browse the repository at this point in the history
Co-authored-by: SimbaGithub <48035983+SimbaGithub@users.noreply.github.com>
Co-authored-by: sfc-gh-ext-simba-hx <harry.xi@insightsoftware.com>
Co-authored-by: Maxim Mishchenko <maxim.mishchenko@snowflake.com>
  • Loading branch information
4 people authored Jan 9, 2025
1 parent d3cef59 commit df3dce7
Show file tree
Hide file tree
Showing 48 changed files with 4,308 additions and 2,188 deletions.
10 changes: 6 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ set (SOURCE_FILES_PUT_GET
cpp/logger/SFLogger.hpp
cpp/logger/SecretDetector.cpp
cpp/logger/SecretDetector.hpp
cpp/lib/ResultSetPutGet.cpp
cpp/lib/ResultSetPutGet.hpp
include/snowflake/IFileTransferAgent.hpp
include/snowflake/ISFLogger.hpp
include/snowflake/IStatementPutGet.hpp
Expand All @@ -146,6 +148,7 @@ set(SOURCE_FILES_CPP_WRAPPER
include/snowflake/SFURL.hpp
include/snowflake/CurlDesc.hpp
include/snowflake/CurlDescPool.hpp
include/snowflake/BindUploader.hpp
cpp/lib/Exceptions.cpp
cpp/lib/Connection.cpp
cpp/lib/Statement.cpp
Expand All @@ -158,8 +161,6 @@ set(SOURCE_FILES_CPP_WRAPPER
cpp/lib/ClientQueryContextCache.cpp
cpp/lib/ClientQueryContextCache.hpp
cpp/lib/result_set.cpp
cpp/lib/result_set_arrow.cpp
cpp/lib/result_set_json.cpp
cpp/lib/ResultSet.cpp
cpp/lib/ResultSet.hpp
cpp/lib/ResultSetArrow.cpp
Expand All @@ -168,14 +169,15 @@ set(SOURCE_FILES_CPP_WRAPPER
cpp/lib/ResultSetJson.hpp
cpp/lib/Authenticator.cpp
cpp/lib/Authenticator.hpp
cpp/lib/BindUploader.cpp
cpp/lib/ClientBindUploader.hpp
cpp/lib/ClientBindUploader.cpp
cpp/jwt/jwtWrapper.cpp
cpp/util/SnowflakeCommon.cpp
cpp/util/SFURL.cpp
cpp/util/CurlDesc.cpp
cpp/util/CurlDescPool.cpp
lib/result_set.h
lib/result_set_arrow.h
lib/result_set_json.h
lib/query_context_cache.h
lib/curl_desc_pool.h
lib/authenticator.h)
Expand Down
4 changes: 3 additions & 1 deletion ci/build_linux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,7 @@ docker run \
#remove image to save disk space on github
if [[ -n "$GITHUB_ACTIONS" ]]; then
docker rm -vf $(docker ps -aq --filter ancestor=${BUILD_IMAGE_NAME})
docker rmi -f "${BUILD_IMAGE_NAME}"
if [[ $CLIENT_CODE_COVERAGE -ne 1 ]] && [[ "$BUILD_TYPE" != "Debug" ]]; then
docker rmi -f "${BUILD_IMAGE_NAME}"
fi
fi
110 changes: 84 additions & 26 deletions cpp/FileTransferAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
#include "FileTransferAgent.hpp"
#include "snowflake/SnowflakeTransferException.hpp"
#include "snowflake/IStatementPutGet.hpp"
#include "StatementPutGet.hpp"
#include "lib/ResultSetPutGet.hpp"
#include "util/Base64.hpp"
#include "SnowflakeS3Client.hpp"
#include "StorageClientFactory.hpp"
#include "crypto/CipherStreamBuf.hpp"
#include "crypto/Cryptor.hpp"
#include "util/CompressionUtil.hpp"
#include "util/ThreadPool.hpp"
#include "util/SnowflakeCommon.hpp"
#include "EncryptionProvider.hpp"
#include "logger/SFLogger.hpp"
#include "error.h"
#include "snowflake/platform.h"
#include "snowflake/SF_CRTFunctionSafe.h"
#include <chrono>
Expand All @@ -29,35 +32,11 @@
using ::std::string;
using ::std::vector;
using ::Snowflake::Client::RemoteStorageRequestOutcome;
using namespace Snowflake::Client::Util;

namespace
{
const std::string FILE_PROTOCOL = "file://";

void replaceStrAll(std::string& stringToReplace,
std::string const& oldValue,
std::string const& newValue)
{
size_t oldValueLen = oldValue.length();
size_t newValueLen = newValue.length();
if (0 == oldValueLen)
{
return;
}

size_t index = 0;
while (true) {
/* Locate the substring to replace. */
index = stringToReplace.find(oldValue, index);
if (index == std::string::npos) break;

/* Make the replacement. */
stringToReplace.replace(index, oldValueLen, newValue);

/* Advance index forward so the next iteration doesn't pick it up as well. */
index += newValueLen;
}
}
}

Snowflake::Client::FileTransferAgent::FileTransferAgent(
Expand Down Expand Up @@ -961,3 +940,82 @@ std::string Snowflake::Client::FileTransferAgent::getLocalFilePathFromCommand(

return localFilePath;
}

using namespace Snowflake::Client;
extern "C" {
SF_STATUS STDCALL _snowflake_execute_put_get_native(
SF_STMT* sfstmt,
void* upload_stream,
size_t stream_size,
struct SF_QUERY_RESULT_CAPTURE* result_capture)
{
if (!sfstmt)
{
return SF_STATUS_ERROR_STATEMENT_NOT_EXIST;
}
SF_CONNECT* sfconn = sfstmt->connection;
if (!sfconn)
{
return SF_STATUS_ERROR_CONNECTION_NOT_EXIST;
}
StatementPutGet stmtPutGet(sfstmt);
TransferConfig transConfig;
transConfig.caBundleFile = NULL; // use the one from global settings
transConfig.compressLevel = sfconn->put_compress_level;
transConfig.getSizeThreshold = sfconn->get_threshold;
transConfig.proxy = NULL; // use the one from statement
transConfig.tempDir = sfconn->put_temp_dir;
transConfig.useS3regionalUrl = sfconn->use_s3_regional_url;
string command(sfstmt->sql_text);

FileTransferAgent agent(&stmtPutGet, &transConfig);
agent.setPutFastFail(sfconn->put_fastfail);
agent.setPutMaxRetries(sfconn->put_maxretries);
agent.setGetFastFail(sfconn->get_fastfail);
agent.setGetMaxRetries(sfconn->get_maxretries);
agent.setRandomDeviceAsUrand(sfconn->put_use_urand_dev);

if (upload_stream)
{
agent.setUploadStream((std::basic_iostream<char>*)upload_stream, stream_size);
}

ITransferResult* result;
try
{
result = agent.execute(&command);
}
catch (std::exception& e)
{
std::string errmsg("File transfer failed: ");
errmsg += e.what();
SET_SNOWFLAKE_ERROR(&sfstmt->error, SF_STATUS_ERROR_FILE_TRANSFER,
errmsg.c_str(), SF_SQLSTATE_GENERAL_ERROR);
return SF_STATUS_ERROR_FILE_TRANSFER;
}
catch (...)
{
std::string errmsg("File transfer failed with unknown exception.");
SET_SNOWFLAKE_ERROR(&sfstmt->error, SF_STATUS_ERROR_FILE_TRANSFER,
errmsg.c_str(), SF_SQLSTATE_GENERAL_ERROR);
return SF_STATUS_ERROR_FILE_TRANSFER;
}

ResultSetPutGet * resultset = new Snowflake::Client::ResultSetPutGet(result);
if (!resultset)
{
std::string errmsg("Failed to allocate put get result set.");
SET_SNOWFLAKE_ERROR(&sfstmt->error, SF_STATUS_ERROR_OUT_OF_MEMORY,
errmsg.c_str(), SF_SQLSTATE_MEMORY_ALLOCATION_ERROR);
return SF_STATUS_ERROR_OUT_OF_MEMORY;
}

sfstmt->qrf = SF_PUTGET_FORMAT;
sfstmt->total_row_index = 0;
sfstmt->result_set = resultset;
sfstmt->chunk_rowcount = sfstmt->total_rowcount = result->getResultSize();
sfstmt->total_fieldcount = resultset->setup_column_desc(&sfstmt->desc);

return SF_STATUS_SUCCESS;
}
}
155 changes: 154 additions & 1 deletion cpp/StatementPutGet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,35 @@
*/

#include <client_int.h>
#include "connection.h"
#include "snowflake/PutGetParseResponse.hpp"
#include "StatementPutGet.hpp"
#include "curl_desc_pool.h"

using namespace Snowflake::Client;

static size_t file_get_write_callback(char* ptr, size_t size, size_t nmemb, void* userdata)
{
size_t data_size = size * nmemb;
std::basic_iostream<char>* recvStream = (std::basic_iostream<char>*)(userdata);
if (recvStream)
{
recvStream->write(static_cast<const char*>(ptr), data_size);
}

return data_size;
}

static size_t file_put_read_callback(void* ptr, size_t size, size_t nmemb, void* userdata)
{
std::basic_iostream<char>* payload = (std::basic_iostream<char>*)(userdata);
size_t data_size = size * nmemb;

payload->read(static_cast<char*>(ptr), data_size);
size_t ret = payload->gcount();
return payload->gcount();
}

StatementPutGet::StatementPutGet(SF_STMT *stmt) :
m_stmt(stmt), m_useProxy(false)
{
Expand All @@ -25,7 +49,7 @@ StatementPutGet::StatementPutGet(SF_STMT *stmt) :
bool StatementPutGet::parsePutGetCommand(std::string *sql,
PutGetParseResponse *putGetParseResponse)
{
if (snowflake_query(m_stmt, sql->c_str(), 0) != SF_STATUS_SUCCESS)
if (_snowflake_query_put_get_legacy(m_stmt, sql->c_str(), 0) != SF_STATUS_SUCCESS)
{
return false;
}
Expand Down Expand Up @@ -108,6 +132,14 @@ bool StatementPutGet::parsePutGetCommand(std::string *sql,
};
putGetParseResponse->stageInfo.endPoint = response->stage_info->endPoint;

}
else if (sf_strncasecmp(response->stage_info->location_type, "gcs", 3) == 0)
{
putGetParseResponse->stageInfo.stageType = StageType::GCS;
putGetParseResponse->stageInfo.credentials = {
{"GCS_ACCESS_TOKEN", response->stage_info->stage_cred->gcs_access_token}
};

} else if (sf_strncasecmp(response->stage_info->location_type,
"local_fs", 8) == 0)
{
Expand All @@ -127,3 +159,124 @@ Util::Proxy* StatementPutGet::get_proxy()
return &m_proxy;
}
}

bool StatementPutGet::http_put(std::string const& url,
std::vector<std::string> const& headers,
std::basic_iostream<char>& payload,
size_t payloadLen,
std::string& responseHeaders)
{
if (!m_stmt || !m_stmt->connection)
{
return false;
}
SF_CONNECT* sf = m_stmt->connection;
void* curl_desc = get_curl_desc_from_pool(url.c_str(), sf->proxy, sf->no_proxy);
CURL* curl = get_curl_from_desc(curl_desc);
if (!curl)
{
return false;
}

char* urlbuf = (char*)SF_CALLOC(1, url.length() + 1);
sf_strcpy(urlbuf, url.length() + 1, url.c_str());

SF_HEADER reqHeaders;
reqHeaders.header = NULL;
for (auto itr = headers.begin(); itr != headers.end(); itr++)
{
reqHeaders.header = curl_slist_append(reqHeaders.header, itr->c_str());
}

PUT_PAYLOAD putPayload;
putPayload.buffer = &payload;
putPayload.length = payloadLen;
putPayload.read_callback = file_put_read_callback;

char* respHeaders = NULL;
sf_bool success = SF_BOOLEAN_FALSE;

success = http_perform(curl, PUT_REQUEST_TYPE, urlbuf, &reqHeaders, NULL, &putPayload, NULL,
NULL, &respHeaders, get_retry_timeout(sf),
SF_BOOLEAN_FALSE, &m_stmt->error, sf->insecure_mode,sf->ocsp_fail_open,
sf->retry_on_curle_couldnt_connect_count,
0, sf->retry_count, NULL, NULL, NULL, SF_BOOLEAN_FALSE,
sf->proxy, sf->no_proxy, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);

free_curl_desc(curl_desc);
SF_FREE(urlbuf);
curl_slist_free_all(reqHeaders.header);
if (respHeaders)
{
responseHeaders = std::string(respHeaders);
SF_FREE(respHeaders);
}

return success;
}

bool StatementPutGet::http_get(std::string const& url,
std::vector<std::string> const& headers,
std::basic_iostream<char>* payload,
std::string& responseHeaders,
bool headerOnly)
{
SF_REQUEST_TYPE reqType = GET_REQUEST_TYPE;
if (headerOnly)
{
reqType = HEAD_REQUEST_TYPE;
}

if (!m_stmt || !m_stmt->connection)
{
return false;
}
SF_CONNECT* sf = m_stmt->connection;

void* curl_desc = get_curl_desc_from_pool(url.c_str(), sf->proxy, sf->no_proxy);
CURL* curl = get_curl_from_desc(curl_desc);
if (!curl)
{
return false;
}

char* urlbuf = (char*)SF_CALLOC(1, url.length() + 1);
sf_strcpy(urlbuf, url.length() + 1, url.c_str());

SF_HEADER reqHeaders;
reqHeaders.header = NULL;
for (auto itr = headers.begin(); itr != headers.end(); itr++)
{
reqHeaders.header = curl_slist_append(reqHeaders.header, itr->c_str());
}

NON_JSON_RESP resp;
resp.buffer = payload;
resp.write_callback = file_get_write_callback;

char* respHeaders = NULL;
sf_bool success = SF_BOOLEAN_FALSE;

success = http_perform(curl, reqType, urlbuf, &reqHeaders, NULL, NULL, NULL,
&resp, &respHeaders, get_retry_timeout(sf),
SF_BOOLEAN_FALSE, &m_stmt->error, sf->insecure_mode, sf->ocsp_fail_open,
sf->retry_on_curle_couldnt_connect_count,
0, sf->retry_count, NULL, NULL, NULL, SF_BOOLEAN_FALSE,
sf->proxy, sf->no_proxy, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);

free_curl_desc(curl_desc);
SF_FREE(urlbuf);
curl_slist_free_all(reqHeaders.header);
if (respHeaders)
{
responseHeaders = respHeaders;
SF_FREE(respHeaders);
}

if (payload)
{
payload->flush();
}

return success;
}
Loading

0 comments on commit df3dce7

Please sign in to comment.