Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix for stitch and put #2917

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Call
}

@Override
public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, BlobInfo blobInfo, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback) {
lock.lock();
try {
Expand All @@ -110,7 +110,7 @@ public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProper
try {
InputStream input = new ReadableStreamChannelInputStream(channel);
byte[] bytes = Utils.readBytesFromStream(input, (int) size);
BlobInfoAndData blob = new BlobInfoAndData(new BlobInfo(blobProperties, userMetadata), bytes);
BlobInfoAndData blob = new BlobInfoAndData(new BlobInfo(blobProperties, blobInfo.getUserMetadata()), bytes);
String id;
do {
id = TestUtils.getRandomString(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public byte[] getUserMetadata() {
return userMetadata;
}

/**
* Set the user metadata of the blob.
* @param userMetadata The user metadata to set.
*/
public void setUserMetadata(byte[] userMetadata) {
this.userMetadata = userMetadata;
}

/**
* Set the lifeVersion of this blob.
* @param lifeVersion The lifeVersion to set.
Expand Down
14 changes: 8 additions & 6 deletions ambry-api/src/main/java/com/github/ambry/router/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Callback<Ge
* @param blobProperties The properties of the blob. Note that the size specified in the properties is ignored.
* The channel is consumed fully, and the size of the blob is the number of bytes read from
* it.
* @param userMetadata Optional user metadata about the blob. This can be null.
* @param blobInfo The blob info which include the original blob property and user metadata.
* @param channel The {@link ReadableStreamChannel} that contains the content of the blob.
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @param callback The {@link Callback} which will be invoked on the completion of the request .
* @param quotaChargeCallback Listener interface to charge quota cost for the operation.
* @return A future that would contain the BlobId eventually.
*/
Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, BlobInfo blobInfo, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback);

/**
Expand Down Expand Up @@ -189,17 +189,19 @@ default CompletableFuture<String> stitchBlob(BlobProperties blobProperties, byte
/**
* Requests for a new blob to be put asynchronously and returns a future that will eventually contain the BlobId of
* the new blob on a successful response.
*
* @param blobProperties The properties of the blob. Note that the size specified in the properties is ignored. The
* channel is consumed fully, and the size of the blob is the number of bytes read from it.
* @param userMetadata Optional user metadata about the blob. This can be null.
* @param channel The {@link ReadableStreamChannel} that contains the content of the blob.
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @param userMetadata Optional user metadata about the blob. This can be null.
* @param channel The {@link ReadableStreamChannel} that contains the content of the blob.
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @return A future that would contain the BlobId eventually.
*/
default CompletableFuture<String> putBlob(BlobProperties blobProperties, byte[] userMetadata,
ReadableStreamChannel channel, PutBlobOptions options) {
CompletableFuture<String> future = new CompletableFuture<>();
putBlob(null, blobProperties, userMetadata, channel, options, CallbackUtils.fromCompletableFuture(future), null);
BlobInfo blobInfo = new BlobInfo(blobProperties, userMetadata);
putBlob(null, blobProperties, blobInfo, channel, options, CallbackUtils.fromCompletableFuture(future), null);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void handlePost(RestRequest restRequest, RestResponseChannel restResponse
restRequest.setArg(RestUtils.InternalKeys.TARGET_CONTAINER_KEY, Container.UNKNOWN_CONTAINER);
BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs());
byte[] usermetadata = RestUtils.buildUserMetadata(restRequest.getArgs());
router.putBlob(null, blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(),
BlobInfo blobInfo = new BlobInfo(blobProperties, usermetadata);
router.putBlob(null, blobProperties, blobInfo, restRequest, new PutBlobOptionsBuilder().build(),
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null);
} catch (RestServiceException e) {
handleResponse(restRequest, restResponseChannel, null, e);
Expand Down Expand Up @@ -158,7 +159,8 @@ public void handlePut(RestRequest restRequest, RestResponseChannel restResponseC
restRequest.setArg(RestUtils.InternalKeys.TARGET_CONTAINER_KEY, Container.UNKNOWN_CONTAINER);
BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs());
byte[] usermetadata = RestUtils.buildUserMetadata(restRequest.getArgs());
router.putBlob(null, blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(),
BlobInfo blobInfo = new BlobInfo(blobProperties, usermetadata);
router.putBlob(null, blobProperties, blobInfo, restRequest, new PutBlobOptionsBuilder().build(),
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null);
} catch (RestServiceException e) {
handleResponse(restRequest, restResponseChannel, null, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.named.NamedBlobDb;
import com.github.ambry.named.NamedBlobRecord;
import com.github.ambry.protocol.DatasetVersionState;
import com.github.ambry.quota.QuotaManager;
import com.github.ambry.quota.QuotaUtils;
Expand Down Expand Up @@ -233,7 +234,7 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
addDatasetVersion(blobInfo.getBlobProperties(), restRequest);
}
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(restRequest, getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options,
router.putBlob(restRequest, getPropertiesForRouterUpload(blobInfo), blobInfo, restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}
}, uri, LOGGER, deleteDatasetCallback);
Expand All @@ -257,7 +258,7 @@ private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) {
retryExecutor.runWithRetries(retryPolicy,
callback -> router.updateBlobTtl(restRequest, blobIdClean, serviceId, Utils.Infinite_Time, callback,
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false)),
this::isRetriable, routerTtlUpdateCallback(blobInfo));
this::isRetriable, routerTtlUpdateCallbackForPut(blobInfo));
} else {
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
//Make sure to process response after delete finished
Expand Down Expand Up @@ -321,7 +322,7 @@ private Callback<String> idConverterCallback(BlobInfo blobInfo, String blobId) {
retryExecutor.runWithRetries(retryPolicy,
callback -> router.updateBlobTtl(null, blobId, serviceId, Utils.Infinite_Time, callback,
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false)), this::isRetriable,
routerTtlUpdateCallback(blobInfo));
routerTtlUpdateCallbackForStitch(blobInfo, blobId));
} else {
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
//Make sure to process response after delete finished
Expand Down Expand Up @@ -351,7 +352,7 @@ private boolean isRetriable(Throwable throwable) {
* @param blobInfo the {@link BlobInfo} to use for security checks.
* @return a {@link Callback} to be used with {@link Router#updateBlobTtl(String, String, long)}.
*/
private Callback<Void> routerTtlUpdateCallback(BlobInfo blobInfo) {
private Callback<Void> routerTtlUpdateCallbackForPut(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.updateBlobTtlRouterMetrics, convertedBlobId -> {
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
//Make sure to process response after delete finished
Expand All @@ -364,6 +365,37 @@ private Callback<Void> routerTtlUpdateCallback(BlobInfo blobInfo) {
}, uri, LOGGER, deleteDatasetCallback);
}

/**
* After TTL update finishes, call {@link SecurityService#postProcessRequest} to perform request time security
* checks that rely on the request being fully parsed and any additional arguments set.
*
* @param blobInfo the {@link BlobInfo} to use for security checks.
* @return a {@link Callback} to be used with {@link Router#updateBlobTtl(String, String, long)}.
*/
private Callback<Void> routerTtlUpdateCallbackForStitch(BlobInfo blobInfo, String blobId) {
return buildCallback(frontendMetrics.updateBlobTtlRouterMetrics, convertedBlobId -> {
// Set the named blob state to be 'READY' after the Ttl update succeed
if (!restRequest.getArgs().containsKey(RestUtils.InternalKeys.NAMED_BLOB_VERSION)) {
throw new RestServiceException("Internal key " + RestUtils.InternalKeys.NAMED_BLOB_VERSION
+ " is required in Named Blob TTL update callback!", RestServiceErrorCode.InternalServerError);
}
long namedBlobVersion = (long) restRequest.getArgs().get(NAMED_BLOB_VERSION);
String blobIdClean = RestUtils.stripSlashAndExtensionFromId(blobId);
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());
NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), blobIdClean, Utils.Infinite_Time, namedBlobVersion);
namedBlobDb.updateBlobTtlAndStateToReady(record).get();
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
//Make sure to process response after delete finished
updateVersionStateAndDeleteDatasetVersionOutOfRetentionCount(
deleteDatasetVersionOutOfRetentionCallback(blobInfo));
} else {
securityService.processResponse(restRequest, restResponseChannel, blobInfo,
securityProcessResponseCallback());
}
}, uri, LOGGER, deleteDatasetCallback);
}

/**
* After updateVersionStateAndDeleteDatasetVersionOutOfRetentionCount, call {@link SecurityService#processResponse}.
* @param blobInfo the {@link BlobInfo} to use for security checks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo));
} else {
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(null, blobInfo.getBlobProperties(), blobInfo.getUserMetadata(), restRequest, options,
router.putBlob(null, blobInfo.getBlobProperties(), blobInfo, restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}
}, uri, LOGGER, finalCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
finalCallback.onCompletion(null, null);
} else {
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(null, blobInfo.getBlobProperties(), blobInfo.getUserMetadata(), restRequest, options,
router.putBlob(null, blobInfo.getBlobProperties(), blobInfo, restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}
}, uri, logger, finalCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4475,7 +4475,7 @@ public Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Call
}

@Override
public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, byte[] usermetadata, ReadableStreamChannel channel,
public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, BlobInfo usermetadata, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback) {
return completeOperation(TestUtils.getRandomString(10), callback, OpType.PutBlob);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.ambry.commons.ByteBufferReadableStreamChannel;
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.quota.QuotaMethod;
import com.github.ambry.quota.QuotaTestUtils;
Expand Down Expand Up @@ -90,7 +91,8 @@ public TtlUpdateHandlerTest() throws Exception {
new AccountAndContainerInjector(ACCOUNT_SERVICE, metrics, config);
ReadableStreamChannel channel = new ByteBufferReadableStreamChannel(ByteBuffer.wrap(BLOB_DATA));
router = new InMemoryRouter(new VerifiableProperties(new Properties()), CLUSTER_MAP, idConverterFactory);
blobId = router.putBlob(null, BLOB_PROPERTIES, new byte[0], channel, new PutBlobOptionsBuilder().build(), null,
BlobInfo blobInfo = new BlobInfo(BLOB_PROPERTIES, new byte[0]);
blobId = router.putBlob(null, BLOB_PROPERTIES, blobInfo, channel, new PutBlobOptionsBuilder().build(), null,
QuotaTestUtils.createTestQuotaChargeCallback(QuotaMethod.WRITE)).get(1, TimeUnit.SECONDS);
idConverterFactory.translation = blobId;
ttlUpdateHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.ambry.commons.ByteBufferReadableStreamChannel;
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.quota.QuotaMethod;
import com.github.ambry.quota.QuotaTestUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,20 @@ public Future<GetBlobResult> getBlob(String blobIdStr, GetBlobOptions options, f
* @param restRequest The {@link RestRequest} to put the blob.
* @param blobProperties The properties of the blob. Note that the size specified in the properties is ignored. The
* channel is consumed fully, and the size of the blob is the number of bytes read from it.
* @param userMetadata Optional user metadata about the blob. This can be null.
* @param blobInfo Optional user metadata about the blob. This can be null.
* @param channel The {@link ReadableStreamChannel} that contains the content of the blob.
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @param callback The {@link Callback} which will be invoked on the completion of the request .
* @return A future that would contain the BlobId eventually.
*/
@Override
public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, BlobInfo blobInfo, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback) {
if (blobProperties == null || channel == null || options == null) {
throw new IllegalArgumentException("blobProperties, channel, or options must not be null");
}
if (userMetadata == null) {
userMetadata = new byte[0];
if (blobInfo.getUserMetadata() == null) {
blobInfo.setUserMetadata(new byte[0]);
}
currentOperationsCount.incrementAndGet();
if (blobProperties.isEncrypted()) {
Expand All @@ -375,10 +375,11 @@ public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProper
}
routerMetrics.operationQueuingRate.mark();
FutureResult<String> futureResult = new FutureResult<>();
//the blob properties passed into id converter should be the original property from blobInfo.
Callback<String> wrappedCallback =
restRequest != null ? createIdConverterCallbackForPut(restRequest, blobProperties, futureResult, callback) : callback;
restRequest != null ? createIdConverterCallbackForPut(restRequest, blobInfo.getBlobProperties(), futureResult, callback) : callback;
if (isOpen.get()) {
getOperationController().putBlob(blobProperties, userMetadata, channel, options, futureResult, wrappedCallback,
getOperationController().putBlob(blobProperties, blobInfo.getUserMetadata(), channel, options, futureResult, wrappedCallback,
quotaChargeCallback);
} else {
RouterException routerException =
Expand Down
Loading
Loading