Skip to content

Commit

Permalink
Merge branch 'feature/431675-api-s3asset' into 'develop'
Browse files Browse the repository at this point in the history
Feature/431675 api s3asset

See merge request upm-inesdata/inesdata-connector!16
  • Loading branch information
ralconada-gmv committed Jun 4, 2024
2 parents 8e75ff7 + ffc1681 commit dc3e389
Show file tree
Hide file tree
Showing 10 changed files with 431 additions and 2 deletions.
3 changes: 3 additions & 0 deletions extensions/store-asset-api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# S3 Asset API

Provides a management API for uploading files to S3 and creating assets.
34 changes: 34 additions & 0 deletions extensions/store-asset-api/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
plugins {
`java-library`
id("com.gmv.inesdata.edc-application")
}

dependencies {
api(libs.edc.spi.asset)
api(libs.edc.spi.core)
implementation(libs.edc.spi.transform)
implementation(libs.edc.web.spi)

implementation(libs.edc.connector.core)
implementation(libs.edc.api.core)
implementation(libs.edc.lib.util)
implementation(libs.edc.lib.transform)
implementation(libs.edc.dsp.api.configuration)
implementation(libs.edc.api.management.config)
implementation(libs.edc.transaction.spi)
implementation(libs.edc.lib.validator)
implementation(libs.edc.validator.spi)
implementation(libs.swagger.annotations.jakarta)
implementation(libs.jakarta.rsApi)
implementation(libs.jakarta.eeApi)
implementation(libs.parsson)
implementation(libs.jersey)
implementation(libs.edc.json.ld.lib)
implementation(libs.aws.s3)
implementation(libs.aws.s3.transfer)
implementation(libs.edc.api.asset)
implementation(libs.edc.control.plane.transform)
implementation(libs.edc.aws.s3.core)
runtimeOnly(libs.edc.spi.jsonld)
runtimeOnly(libs.edc.json.ld.lib)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.upm.inesdata.storageasset;

import jakarta.json.Json;
import org.eclipse.edc.api.validation.DataAddressValidator;
import org.eclipse.edc.connector.api.management.configuration.ManagementApiConfiguration;
import org.eclipse.edc.connector.controlplane.api.management.asset.validation.AssetValidator;
import org.eclipse.edc.connector.controlplane.services.spi.asset.AssetService;
import org.eclipse.edc.connector.controlplane.transform.edc.from.JsonObjectFromAssetTransformer;
import org.eclipse.edc.connector.controlplane.transform.edc.to.JsonObjectToAssetTransformer;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;
import org.eclipse.edc.web.spi.WebService;
import org.upm.inesdata.storageasset.controller.StorageAssetApiController;
import org.upm.inesdata.storageasset.service.S3Service;
import software.amazon.awssdk.regions.Region;

import java.util.Map;

import static org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset.EDC_ASSET_TYPE;
import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD;
import static org.eclipse.edc.spi.types.domain.DataAddress.EDC_DATA_ADDRESS_TYPE;
/**
* Extension that provides an API for managing vocabularies
*/
@Extension(value = StorageAssetApiExtension.NAME)
public class StorageAssetApiExtension implements ServiceExtension {

public static final String NAME = "StorageAsset API Extension";

@Inject
private AssetService assetService;

@Inject
private WebService webService;

@Inject
private ManagementApiConfiguration config;

@Inject
private TypeManager typeManager;

@Inject
private TransactionContext transactionContext;

@Inject
private TypeTransformerRegistry transformerRegistry;

@Inject
private JsonObjectValidatorRegistry validator;
@Inject
private JsonLd jsonLd;


@Override
public String name() {
return NAME;
}

/**
* Initializes the service
*/
@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();

var managementApiTransformerRegistry = transformerRegistry.forContext("management-api");

var factory = Json.createBuilderFactory(Map.of());
var jsonLdMapper = typeManager.getMapper(JSON_LD);
managementApiTransformerRegistry.register(new JsonObjectFromAssetTransformer(factory, jsonLdMapper));
managementApiTransformerRegistry.register(new JsonObjectToAssetTransformer());

validator.register(EDC_ASSET_TYPE, AssetValidator.instance());
validator.register(EDC_DATA_ADDRESS_TYPE, DataAddressValidator.instance());

// Leer las variables de entorno
String accessKey = context.getSetting("edc.aws.access.key","");
String secretKey = context.getSetting("edc.aws.secret.access.key","");
String endpointOverride = context.getSetting("edc.aws.endpoint.override","");
String regionName = context.getSetting("edc.aws.region","");
String bucketName = context.getSetting("edc.aws.bucket.name","");

Region region = Region.of(regionName);

// Crear una instancia de S3Service
S3Service s3Service = new S3Service(accessKey, secretKey, endpointOverride, region, bucketName);

var storageAssetApiController = new StorageAssetApiController(assetService, managementApiTransformerRegistry,
validator,s3Service,
jsonLd, bucketName, regionName);
webService.registerResource(config.getContextAlias(), storageAssetApiController);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.upm.inesdata.storageasset.controller;

import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.info.Info;
import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.json.JsonObject;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.edc.api.model.ApiCoreSchema;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;

import java.io.InputStream;

@OpenAPIDefinition(
info = @Info(description = "Manages the connector s3 assets.",
title = "S3 Asset API", version = "1"))
@Tag(name = "S3Asset")
public interface StorageAssetApi {

/**
* Creates a new storage asset
*
* @param fileInputStream the input stream of the file to be uploaded
* @param fileDetail the details of the file to be uploaded
* @param assetJson the input stream of the asset metadata in JSON format
* @return JsonObject with the created asset
*/
@Operation(description = "Creates a new S3 asset",
requestBody = @RequestBody(content = @Content(mediaType = MediaType.MULTIPART_FORM_DATA, schema = @Schema(
type = "object", requiredProperties = {"file", "json"}
))),
responses = {
@ApiResponse(responseCode = "200", description = "S3 asset was created successfully",
content = @Content(schema = @Schema(implementation = ApiCoreSchema.IdResponseSchema.class))),
@ApiResponse(responseCode = "400", description = "Request body was malformed",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))),
@ApiResponse(responseCode = "409", description = "Could not create asset, because an asset with that ID already exists",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class))))
}
)
JsonObject createStorageAsset(@FormDataParam("file") InputStream fileInputStream,
@FormDataParam("file") FormDataContentDisposition fileDetail,
@FormDataParam("json") JsonObject assetJson);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.upm.inesdata.storageasset.controller;

import jakarta.json.JsonObject;
import jakarta.servlet.annotation.MultipartConfig;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.edc.api.model.IdResponse;
import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.services.spi.asset.AssetService;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.constants.CoreConstants;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.util.string.StringUtils;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;
import org.eclipse.edc.web.spi.exception.InvalidRequestException;
import org.eclipse.edc.web.spi.exception.ValidationFailureException;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
import org.upm.inesdata.storageasset.service.S3Service;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;

import static org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset.EDC_ASSET_TYPE;
import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.exceptionMapper;

@MultipartConfig
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
@Path("/s3assets")
public class StorageAssetApiController implements StorageAssetApi {
private final TypeTransformerRegistry transformerRegistry;
private final AssetService service;
private final JsonObjectValidatorRegistry validator;
private final S3Service s3Service;

private final JsonLd jsonLd;

private final String bucketName;
private final String region;

public StorageAssetApiController(AssetService service, TypeTransformerRegistry transformerRegistry,
JsonObjectValidatorRegistry validator, S3Service s3Service, JsonLd jsonLd, String bucketName, String region) {
this.transformerRegistry = transformerRegistry;
this.service = service;
this.validator = validator;
this.s3Service = s3Service;
this.jsonLd = jsonLd;
this.bucketName = bucketName;
this.region = region;
}

@POST
@Override
public JsonObject createStorageAsset(@FormDataParam("file") InputStream fileInputStream,
@FormDataParam("file") FormDataContentDisposition fileDetail, @FormDataParam("json") JsonObject assetJson) {

String fileName = fileDetail.getFileName();

InputStream bufferedInputStream = new BufferedInputStream(fileInputStream);

JsonObject expand = jsonLd.expand(assetJson).orElseThrow((f) -> new EdcException("Failed to expand request"));
// Validación
validator.validate(EDC_ASSET_TYPE, expand).orElseThrow(ValidationFailureException::new);

// Transformación
var asset = transformerRegistry.transform(expand, Asset.class).orElseThrow(InvalidRequestException::new);

// Guardar fichero en MinIO
// Calcular el tamaño del fichero manualmente
long contentLength = 0;
try {
contentLength = getFileSize(bufferedInputStream);
} catch (IOException e) {
throw new EdcException("Failed to process file size", e);
}
String folder = String.valueOf(asset.getDataAddress().getProperties().get(CoreConstants.EDC_NAMESPACE+"folder"));
String fullKey = StringUtils.isNullOrBlank(folder) || "null".equals(folder)?fileName:(folder.endsWith("/") ? folder + fileName : folder + "/" + fileName);
s3Service.uploadFile(fullKey,bufferedInputStream, contentLength);
try {
setStorageProperties(asset, fullKey);

// Creación de asset
var idResponse = service.create(asset)
.map(a -> IdResponse.Builder.newInstance().id(a.getId()).createdAt(a.getCreatedAt()).build())
.orElseThrow(exceptionMapper(Asset.class, asset.getId()));

return transformerRegistry.transform(idResponse, JsonObject.class)
.orElseThrow(f -> new EdcException(f.getFailureDetail()));
} catch (Exception e) {
// Eliminar el archivo en caso de fallo
s3Service.deleteFile(fullKey);
throw new EdcException("Failed to process multipart data", e);
}
}

private long getFileSize(InputStream inputStream) throws IOException {
byte[] buffer = new byte[8192];
int bytesRead;
long size = 0;

inputStream.mark(Integer.MAX_VALUE);

while ((bytesRead = inputStream.read(buffer)) != -1) {
size += bytesRead;
}

inputStream.reset();

return size;
}

private void setStorageProperties(Asset asset, String fileName) {
asset.getPrivateProperties().put("storageAssetFile", fileName);
asset.getDataAddress().setKeyName(fileName);
asset.getDataAddress().setType("InesDataStore");
asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE+ "bucketName", bucketName);
asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE+"region", region);
}
}
Loading

0 comments on commit dc3e389

Please sign in to comment.