Skip to content

Commit

Permalink
DVX: 318: Added support for file upload and download using presigned …
Browse files Browse the repository at this point in the history
…URLs
  • Loading branch information
Aryamanz29 committed Jun 20, 2024
1 parent f54645b commit 5a3bd39
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 21 deletions.
5 changes: 3 additions & 2 deletions atlan/assets/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/atlanhq/atlan-go/atlan/model"
"github.com/atlanhq/atlan-go/atlan/model/structs"
"hash/fnv"
"reflect"
"strings"
"time"

"github.com/atlanhq/atlan-go/atlan/model"
"github.com/atlanhq/atlan-go/atlan/model/structs"
)

// AtlanObject is an interface that all asset types should implement
Expand Down
165 changes: 148 additions & 17 deletions atlan/assets/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/atlanhq/atlan-go/atlan/logger"
"io"
"net/http"
"net/url"
"os"
"strings"

"github.com/atlanhq/atlan-go/atlan/logger"
)

// AtlanClient defines the Atlan API client structure.
Expand Down Expand Up @@ -187,8 +188,86 @@ func (ac *AtlanClient) DisableLogging() {
ac.SetLogger(false, "")
}

// Removes authorization from header when using
// s3PresignedUrlFileUpload/Download and returns the removed value.
func (ac *AtlanClient) removeAuthorization() (string, error) {
if headers, ok := ac.requestParams["headers"].(map[string]string); ok {
auth, exists := headers["Authorization"]
if exists {
delete(headers, "Authorization")
return auth, nil
}
return "", nil
} else {
return "", fmt.Errorf("unable to remove the \"Authorization\" key " +
"from AtlanClient.requestParams[\"headers\"]; it is not of type map[string]string")
}
}

// Restores the authorization to the header when using s3PresignedUrlFileUpload/Download .
func (ac *AtlanClient) restoreAuthorization(auth string) error {
if headers, ok := ac.requestParams["headers"].(map[string]string); ok {
headers["Authorization"] = auth
} else {
return fmt.Errorf("unable to restore the \"Authorization\" key " +
"to AtlanClient.requestParams[\"headers\"]; it is not of type map[string]string")
}
return nil
}

func (ac *AtlanClient) s3PresignedUrlFileUpload(api *API, uploadFile interface{}) (string, error) {
// Remove authorization and returns the auth value
auth, err := ac.removeAuthorization()
if err != nil {
return "", err
}

// Ensure the authorization is restored after the API call
defer func() {
if auth != "" {
restoreErr := ac.restoreAuthorization(auth)
if restoreErr != nil {
ac.logger.Errorf("failed to restore authorization: %v", restoreErr)
}
}
}()

// Call the API with upload file
response, err := ac.CallAPI(api, nil, uploadFile)
if err != nil {
return "", err
}
return string(response), nil
}

func (ac *AtlanClient) s3PresignedUrlFileDownload(api *API, downloadFile interface{}) (string, error) {
// Remove authorization and returns the auth value
auth, err := ac.removeAuthorization()
if err != nil {
return "", err
}

// Ensure the authorization is restored after the API call
defer func() {
if auth != "" {
restoreErr := ac.restoreAuthorization(auth)
if restoreErr != nil {
ac.logger.Errorf("failed to restore authorization: %v", restoreErr)
}
}
}()

// Call the API with download file
response, err := ac.CallAPI(api, nil, downloadFile)
if err != nil {
return "", err
}
return string(response), nil
}

// CallAPI makes a generic API call.
func (ac *AtlanClient) CallAPI(api *API, queryParams interface{}, requestObj interface{}) ([]byte, error) {
var downloadFile *os.File
params := deepCopy(ac.requestParams)
path := ac.host + api.Endpoint.Atlas + api.Path

Expand All @@ -213,35 +292,65 @@ func (ac *AtlanClient) CallAPI(api *API, queryParams interface{}, requestObj int
}

if requestObj != nil {
//fmt.Println("Request Object:", requestObj)
requestJSON, err := json.Marshal(requestObj)
logger.Log.Debugf("Request JSON: %s", string(requestJSON))
if err != nil {
ac.logger.Errorf("error marshaling request object: %v", err)
return nil, fmt.Errorf("error marshaling request object: %v", err)
switch reqObj := requestObj.(type) {
case bytes.Buffer:
// In case of binary data upload
// Make sure to use the presigned URL
// in the API request, i.e: api.Path
path = api.Path
params["data"] = reqObj
params["content_type"] = "application/octet-stream"
case *os.File:
// In case of file download
// Make sure to use the presigned URL
// in the API request, i.e: api.Path
path = api.Path
downloadFile = reqObj
default:
// Otherwise just use `json.Marshal()`
requestJSON, err := json.Marshal(requestObj)
ac.logger.Debugf("Request JSON: %s", string(requestJSON))
if err != nil {
ac.logger.Errorf("error marshaling request object: %v", err)
return nil, fmt.Errorf("error marshaling request object: %v", err)
}
params["data"] = bytes.NewBuffer(requestJSON)
}
params["data"] = bytes.NewBuffer(requestJSON)
}

ac.logAPICall(api.Method, path)

//logger.Log.Debugf("Params: %v", params)
// Send the request
response, err := ac.makeRequest(api.Method, path, params)
if err != nil {
return nil, handleApiError(response, err)
}

ac.logHTTPStatus(response)

// Handle API error based on response status code
if response.StatusCode != api.Status {
return nil, handleApiError(response, err)
}

// Handle file download
if downloadFile != nil {
_, err := io.Copy(downloadFile, response.Body)
if err != nil {
return nil, fmt.Errorf("failed to copy file contents: %v", err)
}
ac.logger.Infof("downloaded file saved to: %s", downloadFile.Name())
return []byte{}, nil
}

// Handle JSON response
responseJSON, err := io.ReadAll(response.Body)
response.Body.Close()
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
}

if response.StatusCode != api.Status {
return nil, handleApiError(response, err)
}
// Finally, close the request body
response.Body.Close()

ac.logResponse(responseJSON)

Expand All @@ -252,22 +361,34 @@ func (ac *AtlanClient) CallAPI(api *API, queryParams interface{}, requestObj int
func (ac *AtlanClient) makeRequest(method, path string, params map[string]interface{}) (*http.Response, error) {
var req *http.Request
var err error
var contentType string

switch method {
case http.MethodGet:
req, err = http.NewRequest(method, path, nil)
if err != nil {
return nil, ThrowAtlanError(err, CONNECTION_ERROR, nil)
}
case http.MethodPost, http.MethodPut:
body, ok := params["data"].(io.Reader)
var body io.Reader
data, ok := params["data"]
if !ok {
return nil, fmt.Errorf("missing or invalid 'data' parameter for POST/PUT/DELETE request")
return nil, fmt.Errorf("missing 'data' parameter for POST/PUT request")
}
switch v := data.(type) {
case bytes.Buffer:
// Binary data upload
body = &v
case io.Reader:
// JSON payload
body = v
default:
return nil, fmt.Errorf("invalid 'data' parameter type for POST/PUT request")
}
req, err = http.NewRequest(method, path, body)
if err != nil {
return nil, ThrowAtlanError(err, CONNECTION_ERROR, nil)
}
req.Header.Set("Content-Type", "application/json")
case http.MethodDelete:
// DELETE requests may not always have a body.
var body io.Reader
Expand All @@ -277,14 +398,14 @@ func (ac *AtlanClient) makeRequest(method, path string, params map[string]interf
return nil, fmt.Errorf("invalid 'data' parameter for DELETE request")
}
}
// Create a new http request
req, err = http.NewRequest(method, path, body)
if err != nil {
return nil, ThrowAtlanError(err, CONNECTION_ERROR, nil)
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}

default:
return nil, fmt.Errorf("unsupported HTTP method: %s", method)
}
Expand All @@ -301,6 +422,15 @@ func (ac *AtlanClient) makeRequest(method, path string, params map[string]interf
}
}

// Set content-type
if ct, ok := params["content_type"].(string); ok {
contentType = ct
} else {
// Default content type
contentType = "application/json"
}
req.Header.Set("Content-Type", contentType)

// Set query parameters
queryParams, ok := params["params"].(map[string]string)
if ok {
Expand All @@ -327,6 +457,7 @@ func (ac *AtlanClient) makeRequest(method, path string, params map[string]interf
req.URL.RawQuery = query
}

// Finally, execute the request
return ac.Session.Do(req)
}

Expand Down
16 changes: 15 additions & 1 deletion atlan/assets/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
// Entities API
ENTITY_API = "entity/"
ENTITY_BULK_API = "entity/bulk/"

// Files API
FILES_API = "files/"
)

// API defines the structure of an API call.
Expand All @@ -34,7 +37,11 @@ var AtlasEndpoint = Endpoint{
Atlas: "/api/meta/",
}

// API calls for Atlas
var HeraclesEndpoint = Endpoint{
Atlas: "/api/service/",
}

// API calls to various services (Atlas, Heracles etc)
var (
GET_TYPEDEF_BY_NAME = API{
Path: TYPEDEF_BY_NAME,
Expand Down Expand Up @@ -126,6 +133,13 @@ var (
Status: http.StatusOK,
Endpoint: AtlasEndpoint,
}

PRESIGNED_URL = API{
Path: FILES_API + "presignedUrl",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}
)

// Constants for the Atlas search DSL
Expand Down
1 change: 0 additions & 1 deletion atlan/assets/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,6 @@ func handleApiError(response *http.Response, originalError error) error {
default:
return ThrowAtlanError(originalError, ERROR_PASSTHROUGH, nil)
}
return nil
}

func ThrowAtlanError(err error, sdkError ErrorCode, suggestion *string, args ...interface{}) error {
Expand Down
Loading

0 comments on commit 5a3bd39

Please sign in to comment.