Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1732752: Prepare version 2.0.0 #757

Merged
merged 16 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ set (SOURCE_FILES_PUT_GET
cpp/logger/SFLogger.hpp
cpp/logger/SecretDetector.cpp
cpp/logger/SecretDetector.hpp
cpp/lib/ResultSetPutGet.cpp
cpp/lib/ResultSetPutGet.hpp
include/snowflake/IFileTransferAgent.hpp
include/snowflake/ISFLogger.hpp
include/snowflake/IStatementPutGet.hpp
Expand All @@ -146,6 +148,7 @@ set(SOURCE_FILES_CPP_WRAPPER
include/snowflake/SFURL.hpp
include/snowflake/CurlDesc.hpp
include/snowflake/CurlDescPool.hpp
include/snowflake/BindUploader.hpp
cpp/lib/Exceptions.cpp
cpp/lib/Connection.cpp
cpp/lib/Statement.cpp
Expand All @@ -158,8 +161,6 @@ set(SOURCE_FILES_CPP_WRAPPER
cpp/lib/ClientQueryContextCache.cpp
cpp/lib/ClientQueryContextCache.hpp
cpp/lib/result_set.cpp
cpp/lib/result_set_arrow.cpp
cpp/lib/result_set_json.cpp
cpp/lib/ResultSet.cpp
cpp/lib/ResultSet.hpp
cpp/lib/ResultSetArrow.cpp
Expand All @@ -168,14 +169,15 @@ set(SOURCE_FILES_CPP_WRAPPER
cpp/lib/ResultSetJson.hpp
cpp/lib/Authenticator.cpp
cpp/lib/Authenticator.hpp
cpp/lib/BindUploader.cpp
cpp/lib/ClientBindUploader.hpp
cpp/lib/ClientBindUploader.cpp
cpp/jwt/jwtWrapper.cpp
cpp/util/SnowflakeCommon.cpp
cpp/util/SFURL.cpp
cpp/util/CurlDesc.cpp
cpp/util/CurlDescPool.cpp
lib/result_set.h
lib/result_set_arrow.h
lib/result_set_json.h
lib/query_context_cache.h
lib/curl_desc_pool.h
lib/authenticator.h)
Expand Down
4 changes: 3 additions & 1 deletion ci/build_linux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,7 @@ docker run \
#remove image to save disk space on github
if [[ -n "$GITHUB_ACTIONS" ]]; then
docker rm -vf $(docker ps -aq --filter ancestor=${BUILD_IMAGE_NAME})
docker rmi -f "${BUILD_IMAGE_NAME}"
if [[ $CLIENT_CODE_COVERAGE -ne 1 ]] && [[ "$BUILD_TYPE" != "Debug" ]]; then
docker rmi -f "${BUILD_IMAGE_NAME}"
fi
fi
110 changes: 84 additions & 26 deletions cpp/FileTransferAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
#include "FileTransferAgent.hpp"
#include "snowflake/SnowflakeTransferException.hpp"
#include "snowflake/IStatementPutGet.hpp"
#include "StatementPutGet.hpp"
#include "lib/ResultSetPutGet.hpp"
#include "util/Base64.hpp"
#include "SnowflakeS3Client.hpp"
#include "StorageClientFactory.hpp"
#include "crypto/CipherStreamBuf.hpp"
#include "crypto/Cryptor.hpp"
#include "util/CompressionUtil.hpp"
#include "util/ThreadPool.hpp"
#include "util/SnowflakeCommon.hpp"
#include "EncryptionProvider.hpp"
#include "logger/SFLogger.hpp"
#include "error.h"
#include "snowflake/platform.h"
#include "snowflake/SF_CRTFunctionSafe.h"
#include <chrono>
Expand All @@ -29,35 +32,11 @@
using ::std::string;
using ::std::vector;
using ::Snowflake::Client::RemoteStorageRequestOutcome;
using namespace Snowflake::Client::Util;

namespace
{
const std::string FILE_PROTOCOL = "file://";

void replaceStrAll(std::string& stringToReplace,
std::string const& oldValue,
std::string const& newValue)
{
size_t oldValueLen = oldValue.length();
size_t newValueLen = newValue.length();
if (0 == oldValueLen)
{
return;
}

size_t index = 0;
while (true) {
/* Locate the substring to replace. */
index = stringToReplace.find(oldValue, index);
if (index == std::string::npos) break;

/* Make the replacement. */
stringToReplace.replace(index, oldValueLen, newValue);

/* Advance index forward so the next iteration doesn't pick it up as well. */
index += newValueLen;
}
}
}

Snowflake::Client::FileTransferAgent::FileTransferAgent(
Expand Down Expand Up @@ -961,3 +940,82 @@ std::string Snowflake::Client::FileTransferAgent::getLocalFilePathFromCommand(

return localFilePath;
}

using namespace Snowflake::Client;
extern "C" {
SF_STATUS STDCALL _snowflake_execute_put_get_native(
SF_STMT* sfstmt,
void* upload_stream,
size_t stream_size,
struct SF_QUERY_RESULT_CAPTURE* result_capture)
{
if (!sfstmt)
{
return SF_STATUS_ERROR_STATEMENT_NOT_EXIST;
}
SF_CONNECT* sfconn = sfstmt->connection;
if (!sfconn)
{
return SF_STATUS_ERROR_CONNECTION_NOT_EXIST;
}
StatementPutGet stmtPutGet(sfstmt);
TransferConfig transConfig;
transConfig.caBundleFile = NULL; // use the one from global settings
transConfig.compressLevel = sfconn->put_compress_level;
transConfig.getSizeThreshold = sfconn->get_threshold;
transConfig.proxy = NULL; // use the one from statement
transConfig.tempDir = sfconn->put_temp_dir;
transConfig.useS3regionalUrl = sfconn->use_s3_regional_url;
string command(sfstmt->sql_text);

FileTransferAgent agent(&stmtPutGet, &transConfig);
agent.setPutFastFail(sfconn->put_fastfail);
agent.setPutMaxRetries(sfconn->put_maxretries);
agent.setGetFastFail(sfconn->get_fastfail);
agent.setGetMaxRetries(sfconn->get_maxretries);
agent.setRandomDeviceAsUrand(sfconn->put_use_urand_dev);

if (upload_stream)
{
agent.setUploadStream((std::basic_iostream<char>*)upload_stream, stream_size);
}

ITransferResult* result;
try
{
result = agent.execute(&command);
}
catch (std::exception& e)
{
std::string errmsg("File transfer failed: ");
errmsg += e.what();
SET_SNOWFLAKE_ERROR(&sfstmt->error, SF_STATUS_ERROR_FILE_TRANSFER,
errmsg.c_str(), SF_SQLSTATE_GENERAL_ERROR);
return SF_STATUS_ERROR_FILE_TRANSFER;
}
catch (...)
{
std::string errmsg("File transfer failed with unknown exception.");
SET_SNOWFLAKE_ERROR(&sfstmt->error, SF_STATUS_ERROR_FILE_TRANSFER,
errmsg.c_str(), SF_SQLSTATE_GENERAL_ERROR);
return SF_STATUS_ERROR_FILE_TRANSFER;
}

ResultSetPutGet * resultset = new Snowflake::Client::ResultSetPutGet(result);
if (!resultset)
{
std::string errmsg("Failed to allocate put get result set.");
SET_SNOWFLAKE_ERROR(&sfstmt->error, SF_STATUS_ERROR_OUT_OF_MEMORY,
errmsg.c_str(), SF_SQLSTATE_MEMORY_ALLOCATION_ERROR);
return SF_STATUS_ERROR_OUT_OF_MEMORY;
}

sfstmt->qrf = SF_PUTGET_FORMAT;
sfstmt->total_row_index = 0;
sfstmt->result_set = resultset;
sfstmt->chunk_rowcount = sfstmt->total_rowcount = result->getResultSize();
sfstmt->total_fieldcount = resultset->setup_column_desc(&sfstmt->desc);

return SF_STATUS_SUCCESS;
}
}
155 changes: 154 additions & 1 deletion cpp/StatementPutGet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,35 @@
*/

#include <client_int.h>
#include "connection.h"
#include "snowflake/PutGetParseResponse.hpp"
#include "StatementPutGet.hpp"
#include "curl_desc_pool.h"

using namespace Snowflake::Client;

static size_t file_get_write_callback(char* ptr, size_t size, size_t nmemb, void* userdata)
{
size_t data_size = size * nmemb;
std::basic_iostream<char>* recvStream = (std::basic_iostream<char>*)(userdata);
if (recvStream)
{
recvStream->write(static_cast<const char*>(ptr), data_size);
}

return data_size;
}

static size_t file_put_read_callback(void* ptr, size_t size, size_t nmemb, void* userdata)
{
std::basic_iostream<char>* payload = (std::basic_iostream<char>*)(userdata);
size_t data_size = size * nmemb;

payload->read(static_cast<char*>(ptr), data_size);
size_t ret = payload->gcount();
return payload->gcount();
}

StatementPutGet::StatementPutGet(SF_STMT *stmt) :
m_stmt(stmt), m_useProxy(false)
{
Expand All @@ -25,7 +49,7 @@ StatementPutGet::StatementPutGet(SF_STMT *stmt) :
bool StatementPutGet::parsePutGetCommand(std::string *sql,
PutGetParseResponse *putGetParseResponse)
{
if (snowflake_query(m_stmt, sql->c_str(), 0) != SF_STATUS_SUCCESS)
if (_snowflake_query_put_get_legacy(m_stmt, sql->c_str(), 0) != SF_STATUS_SUCCESS)
{
return false;
}
Expand Down Expand Up @@ -108,6 +132,14 @@ bool StatementPutGet::parsePutGetCommand(std::string *sql,
};
putGetParseResponse->stageInfo.endPoint = response->stage_info->endPoint;

}
else if (sf_strncasecmp(response->stage_info->location_type, "gcs", 3) == 0)
{
putGetParseResponse->stageInfo.stageType = StageType::GCS;
putGetParseResponse->stageInfo.credentials = {
{"GCS_ACCESS_TOKEN", response->stage_info->stage_cred->gcs_access_token}
};

} else if (sf_strncasecmp(response->stage_info->location_type,
"local_fs", 8) == 0)
{
Expand All @@ -127,3 +159,124 @@ Util::Proxy* StatementPutGet::get_proxy()
return &m_proxy;
}
}

bool StatementPutGet::http_put(std::string const& url,
std::vector<std::string> const& headers,
std::basic_iostream<char>& payload,
size_t payloadLen,
std::string& responseHeaders)
{
if (!m_stmt || !m_stmt->connection)
{
return false;
}
SF_CONNECT* sf = m_stmt->connection;
void* curl_desc = get_curl_desc_from_pool(url.c_str(), sf->proxy, sf->no_proxy);
CURL* curl = get_curl_from_desc(curl_desc);
if (!curl)
{
return false;
}

char* urlbuf = (char*)SF_CALLOC(1, url.length() + 1);
sf_strcpy(urlbuf, url.length() + 1, url.c_str());

SF_HEADER reqHeaders;
reqHeaders.header = NULL;
for (auto itr = headers.begin(); itr != headers.end(); itr++)
{
reqHeaders.header = curl_slist_append(reqHeaders.header, itr->c_str());
}

PUT_PAYLOAD putPayload;
putPayload.buffer = &payload;
putPayload.length = payloadLen;
putPayload.read_callback = file_put_read_callback;

char* respHeaders = NULL;
sf_bool success = SF_BOOLEAN_FALSE;

success = http_perform(curl, PUT_REQUEST_TYPE, urlbuf, &reqHeaders, NULL, &putPayload, NULL,
NULL, &respHeaders, get_retry_timeout(sf),
SF_BOOLEAN_FALSE, &m_stmt->error, sf->insecure_mode,sf->ocsp_fail_open,
sf->retry_on_curle_couldnt_connect_count,
0, sf->retry_count, NULL, NULL, NULL, SF_BOOLEAN_FALSE,
sf->proxy, sf->no_proxy, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);

free_curl_desc(curl_desc);
SF_FREE(urlbuf);
curl_slist_free_all(reqHeaders.header);
if (respHeaders)
{
responseHeaders = std::string(respHeaders);
SF_FREE(respHeaders);
}

return success;
}

bool StatementPutGet::http_get(std::string const& url,
std::vector<std::string> const& headers,
std::basic_iostream<char>* payload,
std::string& responseHeaders,
bool headerOnly)
{
SF_REQUEST_TYPE reqType = GET_REQUEST_TYPE;
if (headerOnly)
{
reqType = HEAD_REQUEST_TYPE;
}

if (!m_stmt || !m_stmt->connection)
{
return false;
}
SF_CONNECT* sf = m_stmt->connection;

void* curl_desc = get_curl_desc_from_pool(url.c_str(), sf->proxy, sf->no_proxy);
CURL* curl = get_curl_from_desc(curl_desc);
if (!curl)
{
return false;
}

char* urlbuf = (char*)SF_CALLOC(1, url.length() + 1);
sf_strcpy(urlbuf, url.length() + 1, url.c_str());

SF_HEADER reqHeaders;
reqHeaders.header = NULL;
for (auto itr = headers.begin(); itr != headers.end(); itr++)
{
reqHeaders.header = curl_slist_append(reqHeaders.header, itr->c_str());
}

NON_JSON_RESP resp;
resp.buffer = payload;
resp.write_callback = file_get_write_callback;

char* respHeaders = NULL;
sf_bool success = SF_BOOLEAN_FALSE;

success = http_perform(curl, reqType, urlbuf, &reqHeaders, NULL, NULL, NULL,
&resp, &respHeaders, get_retry_timeout(sf),
SF_BOOLEAN_FALSE, &m_stmt->error, sf->insecure_mode, sf->ocsp_fail_open,
sf->retry_on_curle_couldnt_connect_count,
0, sf->retry_count, NULL, NULL, NULL, SF_BOOLEAN_FALSE,
sf->proxy, sf->no_proxy, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE);

free_curl_desc(curl_desc);
SF_FREE(urlbuf);
curl_slist_free_all(reqHeaders.header);
if (respHeaders)
{
responseHeaders = respHeaders;
SF_FREE(respHeaders);
}

if (payload)
{
payload->flush();
}

return success;
}
Loading
Loading