Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OSPP2024]feat: support aliyun OSS Sink Connector #540

Merged
merged 6 commits into from
Oct 30, 2024

Conversation

limbo-24
Copy link
Contributor

No description provided.

try {
OSSObject ossObject = ossClient.getObject(bucketName, absolutePath);
InputStream inputStream = ossObject.getObjectContent();
offset = inputStream.available();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roll the file once its size exceeds the threshold. For example, when the file size becomes larger than 200 MB, create a new file for writing and archive the old ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before writing to OSS, check the offset size. If it exceeds OBJECT_SIZE_THRESHOLD, append the current timestamp as a suffix to the specified file name. For example: test.txt -> test.txt_2024-09-21-22:33:00

try {
handleRecord(sinkRecord);
} catch (OSSException oe) {
System.out.println("Caught an OSSException, which means your request made it to OSS, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't write msg to STDOUT, use a log instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: replaced System.out.println with log.error.

log.error("OSSSinkTask | genObjectOffset | error => ", e);
}
});
if (enableBatchPut && !recordMap.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When any exception happens above, I think processMap() should not be called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a boolean variable hasException to ensure processMap is called only if all records are processed successfully.

Copy link
Contributor

@ingdex ingdex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

| region | String | YES | OSS region | cn-beijing |
| partitionMethod | String | YES | 分区模式,Normal表示不分区,Time表示按时间分区 | Time |
| fileUrlPrefix | String | YES | 到object的URL前缀 | file1/ |
| enableBatchPut | String | NO | 是否开启批处理模式 | true |
Copy link

@2011shenlin 2011shenlin Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances will enableBatchPut be set to false? If such a scenario does not exist, this parameter can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed enableBatchPut


<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-oss</artifactId>
<version>0.0.1-SNAPSHOT</version>
Copy link
Contributor

@ingdex ingdex Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use release version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@2011shenlin 2011shenlin merged commit 0da281f into apache:master Oct 30, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants