From 583e658b20f4fce04819ff180d9802c1a1b24a3a Mon Sep 17 00:00:00 2001 From: Goson Zhang <4675739@qq.com> Date: Thu, 16 Jan 2025 09:49:47 +0800 Subject: [PATCH] [INLONG-11675][SDK] Optimize IpUtils class related implementation (#11676) Co-authored-by: gosonzhang --- .../sdk/dataproxy/ProxyClientConfig.java | 7 +- .../sdk/dataproxy/codec/ProtocolEncoder.java | 7 +- .../dataproxy/config/ProxyConfigManager.java | 4 +- .../dataproxy/example/HttpClientExample.java | 7 +- .../dataproxy/example/TcpClientExample.java | 8 +- .../sdk/dataproxy/network/ClientMgr.java | 3 +- .../inlong/sdk/dataproxy/network/IpUtils.java | 139 ------------------ .../inlong/sdk/dataproxy/network/Sender.java | 5 +- .../sdk/dataproxy/network/SequentialID.java | 4 +- .../dataproxy/threads/MetricWorkerThread.java | 4 +- .../sdk/dataproxy/utils/AuthzUtils.java | 51 +++++++ .../sdk/dataproxy/utils/ProxyUtils.java | 24 +++ .../{UtilsTest.java => ProxyUtilsTest.java} | 6 +- .../sdk/dirtydata/InlongSdkDirtySender.java | 3 +- 14 files changed, 102 insertions(+), 170 deletions(-) delete mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/AuthzUtils.java rename inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/{UtilsTest.java => ProxyUtilsTest.java} (88%) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index fedf925aa18..efb5ac940be 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -19,7 +19,6 @@ import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.metric.MetricConfig; -import org.apache.inlong.sdk.dataproxy.network.IpUtils; import lombok.Data; import org.apache.commons.lang3.StringUtils; @@ -98,11 +97,8 @@ public class ProxyClientConfig { private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT; /* pay attention to the last url parameter ip */ - public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String managerIp, + public ProxyClientConfig(boolean visitManagerByHttp, String managerIp, int managerPort, String inlongGroupId, String authSecretId, String authSecretKey) throws ProxySdkException { - if (StringUtils.isBlank(localHost)) { - throw new ProxySdkException("localHost is blank!"); - } if (StringUtils.isBlank(managerIp)) { throw new ProxySdkException("managerIp is Blank!"); } @@ -116,7 +112,6 @@ public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String ma this.visitManagerByHttp = visitManagerByHttp; this.managerPort = managerPort; this.managerIP = managerIp; - IpUtils.validLocalIp(localHost); this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE; this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE; this.proxyHttpUpdateIntervalMinutes = ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java index 1a20766e55e..bf1cc995027 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java @@ -20,9 +20,10 @@ import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry; import org.apache.inlong.sdk.dataproxy.config.EncryptInfo; -import org.apache.inlong.sdk.dataproxy.network.IpUtils; +import org.apache.inlong.sdk.dataproxy.utils.AuthzUtils; import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil; import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -84,8 +85,8 @@ private ByteBuf writeToBuf8(EncodeObject object) { } long timestamp = System.currentTimeMillis(); int nonce = new SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE); - endAttr = endAttr + "_userName=" + object.getUserName() + "&_clientIP=" + IpUtils.getLocalIp() - + "&_signature=" + IpUtils.generateSignature(object.getUserName(), + endAttr = endAttr + "_userName=" + object.getUserName() + "&_clientIP=" + ProxyUtils.getLocalIp() + + "&_signature=" + AuthzUtils.generateSignature(object.getUserName(), timestamp, nonce, object.getSecretKey()) + "&_timeStamp=" + timestamp + "&_nonce=" + nonce; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java index 8ff1dc0ea92..27cc4ee8ef0 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java @@ -23,8 +23,8 @@ import org.apache.inlong.sdk.dataproxy.ConfigConstants; import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.network.ClientMgr; -import org.apache.inlong.sdk.dataproxy.network.IpUtils; import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.apache.inlong.sdk.dataproxy.utils.Tuple2; import com.google.gson.Gson; @@ -850,7 +850,7 @@ private void addAuthorizationInfo(HttpPost httpPost) { private List buildProxyNodeQueryParams() { ArrayList params = new ArrayList<>(); - params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp())); + params.add(new BasicNameValuePair("ip", ProxyUtils.getLocalIp())); params.add(new BasicNameValuePair("protocolType", clientConfig.getProtocolType())); return params; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java index c22c3aed98f..d370623f8de 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java @@ -34,24 +34,23 @@ public static void main(String[] args) { String configBasePath = ""; String inLongManagerAddr = "127.0.0.1"; String inLongManagerPort = "8083"; - String localIP = "127.0.0.1"; String messageBody = "inlong message body!"; - HttpProxySender sender = getMessageSender(localIP, inLongManagerAddr, + HttpProxySender sender = getMessageSender(inLongManagerAddr, inLongManagerPort, inlongGroupId, true, false, configBasePath); sendHttpMessage(sender, inlongGroupId, inlongStreamId, messageBody); sender.close(); // close the sender } - public static HttpProxySender getMessageSender(String localIP, String inLongManagerAddr, + public static HttpProxySender getMessageSender(String inLongManagerAddr, String inLongManagerPort, String inlongGroupId, boolean requestByHttp, boolean isReadProxyIPFromLocal, String configBasePath) { ProxyClientConfig proxyConfig = null; HttpProxySender sender = null; try { - proxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr, + proxyConfig = new ProxyClientConfig(requestByHttp, inLongManagerAddr, Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong");// user and password of manager proxyConfig.setConfigStoreBasePath(configBasePath); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java index 85012af1725..a85086ac38f 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java @@ -32,8 +32,6 @@ public class TcpClientExample { private static final Logger logger = LoggerFactory.getLogger(TcpClientExample.class); - public static String localIP = "127.0.0.1"; - /** * Example of client tcp. */ @@ -54,20 +52,20 @@ public static void main(String[] args) throws InterruptedException { TcpClientExample tcpClientExample = new TcpClientExample(); DefaultMessageSender sender = tcpClientExample - .getMessageSender(localIP, inLongManagerAddr, inLongManagerPort, + .getMessageSender(inLongManagerAddr, inLongManagerPort, inlongGroupId, true, false, configBasePath, msgType); tcpClientExample.sendTcpMessage(sender, inlongGroupId, inlongStreamId, messageBody, System.currentTimeMillis()); sender.close(); // close the sender } - public DefaultMessageSender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort, + public DefaultMessageSender getMessageSender(String inLongManagerAddr, String inLongManagerPort, String inlongGroupId, boolean requestByHttp, boolean isReadProxyIPFromLocal, String configBasePath, int msgType) { ProxyClientConfig dataProxyConfig = null; DefaultMessageSender messageSender = null; try { - dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr, + dataProxyConfig = new ProxyClientConfig(requestByHttp, inLongManagerAddr, Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong"); if (StringUtils.isNotEmpty(configBasePath)) { dataProxyConfig.setConfigStoreBasePath(configBasePath); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java index 27003f64111..608fcf67d4f 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java @@ -26,6 +26,7 @@ import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil; import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.apache.inlong.sdk.dataproxy.utils.Tuple2; import io.netty.bootstrap.Bootstrap; @@ -55,7 +56,7 @@ public class ClientMgr { private static final LogCounter logCounter = new LogCounter(10, 100000, 60 * 1000L); private static final LogCounter updConExptCnt = new LogCounter(10, 100000, 60 * 1000L); private static final LogCounter exptCounter = new LogCounter(10, 100000, 60 * 1000L); - private static final byte[] hbMsgBody = IpUtils.getLocalIp().getBytes(StandardCharsets.UTF_8); + private static final byte[] hbMsgBody = ProxyUtils.getLocalIp().getBytes(StandardCharsets.UTF_8); private final Sender sender; private final ProxyClientConfig configure; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java deleted file mode 100644 index 90a07167724..00000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.network; - -import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.digest.HmacUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.URLEncoder; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -public class IpUtils { - - private static final Logger logger = LoggerFactory.getLogger(IpUtils.class); - private static String userIp; - - static { - userIp = getLocalIp(); - } - - public static String getLocalIp() { - if (userIp != null) { - return userIp; - } - String ip = "127.0.0.1"; - try (DatagramSocket socket = new DatagramSocket()) { - socket.connect(InetAddress.getByName("8.8.8.8"), 10002); - ip = socket.getLocalAddress().getHostAddress(); - } catch (Exception ignored) { - logger.warn("getLocalIp ", ignored); - } - userIp = ip; - return ip; - } - - public static boolean validLocalIp(String currLocalHost) throws ProxySdkException { - String ip = "127.0.0.1"; - try (DatagramSocket socket = new DatagramSocket()) { - socket.connect(InetAddress.getByName("8.8.8.8"), 10002); - ip = socket.getLocalAddress().getHostAddress(); - } catch (Exception ex) { - logger.error("error while get local ip", ex); - } - if (!ip.equals(currLocalHost)) { - logger.warn("ip is not equal {} {}", currLocalHost, ip); - } - userIp = ip; - return true; - } - - public static byte[] toBytes(String ipAddr) { - byte[] ret = new byte[4]; - try { - String[] ipArr = ipAddr.split("\\."); - ret[0] = (byte) (Integer.parseInt(ipArr[0]) & 0xFF); - ret[1] = (byte) (Integer.parseInt(ipArr[1]) & 0xFF); - ret[2] = (byte) (Integer.parseInt(ipArr[2]) & 0xFF); - ret[3] = (byte) (Integer.parseInt(ipArr[3]) & 0xFF); - return ret; - } catch (Exception e) { - throw new IllegalArgumentException(ipAddr + " is invalid IP"); - } - } - - public static int bytesToInt(byte[] bytes) { - int addr = bytes[3] & 0xFF; - addr |= ((bytes[2] << 8) & 0xFF00); - addr |= ((bytes[1] << 16) & 0xFF0000); - addr |= ((bytes[0] << 24) & 0xFF000000); - return addr; - } - - public static String convertListToString(List list, Character ch) { - if (list == null || list.isEmpty()) { - return ""; - } - StringBuilder sb = new StringBuilder(); - Iterator itr = list.iterator(); - sb.append(itr.next()); - while (itr.hasNext()) { - sb.append(ch).append(itr.next()); - } - return sb.toString(); - } - - public static String generateSignature(String secureId, long timestamp, int randomValue, String secureKey) { - Base64 base64 = new Base64(); - byte[] baseStr = base64.encode(HmacUtils.hmacSha1(secureKey, secureId + timestamp + randomValue)); - String result = ""; - try { - result = URLEncoder.encode(new String(baseStr), "UTF-8"); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - return result; - } - - public static String getAuthorizenInfo(final String secretId, final String secretKey, long timestamp, int nonce) { - String signature = generateSignature(secretId, timestamp, nonce, secretKey); - return "manager " + secretId + " " + timestamp + " " + nonce + " " + signature; - } - - public static String convertSetToString(Set list, Character ch) { - if (list == null || list.isEmpty()) { - return ""; - } - StringBuilder sb = new StringBuilder(); - Iterator itr = list.iterator(); - sb.append(itr.next()); - while (itr.hasNext()) { - sb.append(ch).append(itr.next()); - } - return sb.toString(); - } - -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index 088ecb3d60c..4ba988b19f0 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -26,6 +26,7 @@ import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread; import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread; import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.apache.inlong.sdk.dataproxy.utils.Tuple2; import io.netty.channel.Channel; @@ -167,7 +168,7 @@ public SendResult syncSendMessage(EncodeObject encodeObject, String msgUUID) { } if (configure.isEnableMetric()) { metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), - encodeObject.getStreamId(), IpUtils.getLocalIp(), encodeObject.getDt(), + encodeObject.getStreamId(), ProxyUtils.getLocalIp(), encodeObject.getDt(), encodeObject.getPackageTime(), encodeObject.getRealCnt()); } SendResult message; @@ -320,7 +321,7 @@ public void asyncSendMessage(EncodeObject encodeObject, } if (configure.isEnableMetric()) { metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(), - encodeObject.getStreamId(), IpUtils.getLocalIp(), encodeObject.getPackageTime(), + encodeObject.getStreamId(), ProxyUtils.getLocalIp(), encodeObject.getPackageTime(), encodeObject.getDt(), encodeObject.getRealCnt()); } // send message package time diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java index 0ad2813448c..f4a396b940b 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java @@ -17,6 +17,8 @@ package org.apache.inlong.sdk.dataproxy.network; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + import java.security.SecureRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -24,7 +26,7 @@ public class SequentialID { private static final SecureRandom sRandom = new SecureRandom( Long.toString(System.nanoTime()).getBytes()); - private final String ip = IpUtils.getLocalIp(); + private final String ip = ProxyUtils.getLocalIp(); private final AtomicInteger id = new AtomicInteger(sRandom.nextInt()); public SequentialID() { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java index 4d5459b8e14..a974adc803b 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java @@ -24,9 +24,9 @@ import org.apache.inlong.sdk.dataproxy.metric.MessageRecord; import org.apache.inlong.sdk.dataproxy.metric.MetricConfig; import org.apache.inlong.sdk.dataproxy.metric.MetricTimeNumSummary; -import org.apache.inlong.sdk.dataproxy.network.IpUtils; import org.apache.inlong.sdk.dataproxy.network.Sender; import org.apache.inlong.sdk.dataproxy.network.SequentialID; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,7 +207,7 @@ private void sendSingleLine(String line, String streamId, long dtTime) { EncodeObject encodeObject = new EncodeObject(Collections.singletonList(line.getBytes()), 7, false, false, false, dtTime, idGenerator.getNextInt(), - metricConfig.getMetricGroupId(), streamId, "", "", IpUtils.getLocalIp()); + metricConfig.getMetricGroupId(), streamId, "", "", ProxyUtils.getLocalIp()); MetricSendCallBack callBack = new MetricSendCallBack(encodeObject); tryToSendMetricToManager(encodeObject, callBack); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/AuthzUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/AuthzUtils.java new file mode 100644 index 00000000000..a5ed8d5fbe3 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/AuthzUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.utils; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.digest.HmacUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; + +/** + * Authenticate Utils class + * + * Used to place public processing functions related to authentication and signature + */ +public class AuthzUtils { + + private static final Logger logger = LoggerFactory.getLogger(AuthzUtils.class); + private static final LogCounter exptCounter = new LogCounter(10, 200000, 60 * 1000L); + + public static String generateSignature(String secureId, long timestamp, int randomValue, String secureKey) { + Base64 base64 = new Base64(); + byte[] baseStr = base64.encode(HmacUtils.hmacSha1(secureKey, secureId + timestamp + randomValue)); + String result = ""; + try { + result = URLEncoder.encode(new String(baseStr), StandardCharsets.UTF_8.toString()); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("Generate signature throw exception", ex); + } + } + return result; + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java index b7bd42ab2b5..6da97d47506 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java @@ -25,6 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.DatagramSocket; +import java.net.InetAddress; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -34,16 +36,38 @@ public class ProxyUtils { private static final Logger logger = LoggerFactory.getLogger(ProxyUtils.class); + private static final LogCounter exceptCounter = new LogCounter(10, 200000, 60 * 1000L); + private static final int TIME_LENGTH = 13; private static final Set invalidAttr = new HashSet<>(); + private static String localHost; + static { + localHost = getLocalIp(); Collections.addAll(invalidAttr, "groupId", "streamId", "dt", "msgUUID", "cp", "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId", "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey", "proxySend", "errMsg", "errCode", AttributeConstants.MSG_RPT_TIME); } + public static String getLocalIp() { + if (localHost != null) { + return localHost; + } + String ip = "127.0.0.1"; + try (DatagramSocket socket = new DatagramSocket()) { + socket.connect(InetAddress.getByName("8.8.8.8"), 10002); + ip = socket.getLocalAddress().getHostAddress(); + } catch (Throwable ex) { + if (exceptCounter.shouldPrint()) { + logger.error("DataProxy-SDK get local IP failure", ex); + } + } + localHost = ip; + return ip; + } + public static boolean isAttrKeysValid(Map attrsMap) { if (attrsMap == null || attrsMap.size() == 0) { return false; diff --git a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java similarity index 88% rename from inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java rename to inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java index b1a0ccd7bb3..5755aac08d5 100644 --- a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/UtilsTest.java +++ b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyUtilsTest.java @@ -17,16 +17,16 @@ package org.apache.inlong.sdk.dataproxy; -import org.apache.inlong.sdk.dataproxy.network.IpUtils; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.junit.Assert; import org.junit.Test; -public class UtilsTest { +public class ProxyUtilsTest { @Test public void getLocalIp() { - String ip = IpUtils.getLocalIp(); + String ip = ProxyUtils.getLocalIp(); Assert.assertNotNull(ip); } diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 1965ef37e34..b029392a497 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -27,7 +27,6 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import java.net.InetAddress; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -60,7 +59,7 @@ public void init() throws Exception { Preconditions.checkNotNull(authKey, "authKey cannot be null"); ProxyClientConfig proxyClientConfig = - new ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true, + new ProxyClientConfig(true, inlongManagerAddr, inlongManagerPort, inlongGroupId, authId, authKey); proxyClientConfig.setOnlyUseLocalProxyConfig(false); proxyClientConfig.setAsyncCallbackSize(maxCallbackSize);