Skip to content

Commit

Permalink
Refine PeerServerSegmentFinder (apache#12933)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Apr 16, 2024
1 parent c08ba2c commit ec452a4
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 339 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public abstract class BaseSegmentFetcher implements SegmentFetcher {
public static final String RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY = "retry.delay.scale.factor";
public static final int DEFAULT_RETRY_COUNT = 3;
public static final int DEFAULT_RETRY_WAIT_MS = 100;
public static final int DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;
public static final double DEFAULT_RETRY_DELAY_SCALE_FACTOR = 5;

protected final Logger _logger = LoggerFactory.getLogger(getClass().getSimpleName());

protected int _retryCount;
protected int _retryWaitMs;
protected int _retryDelayScaleFactor;
protected double _retryDelayScaleFactor;
protected AuthProvider _authProvider;

@Override
Expand All @@ -58,9 +58,8 @@ public void init(PinotConfiguration config) {
_retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR);
_authProvider = AuthProviderUtils.extractAuthProvider(config, CommonConstants.KEY_OF_AUTH);
doInit(config);
_logger
.info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
_retryDelayScaleFactor);
_logger.info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount,
_retryWaitMs, _retryDelayScaleFactor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,16 @@
public class HttpSegmentFetcher extends BaseSegmentFetcher {
protected FileUploadDownloadClient _httpClient;

@Override
protected void doInit(PinotConfiguration config) {
_httpClient = new FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build());
}

public HttpSegmentFetcher() {
}

@VisibleForTesting
protected HttpSegmentFetcher(FileUploadDownloadClient httpClient, PinotConfiguration config) {
void setHttpClient(FileUploadDownloadClient httpClient) {
_httpClient = httpClient;
_retryCount = config.getProperty(RETRY_COUNT_CONFIG_KEY, DEFAULT_RETRY_COUNT);
_retryWaitMs = config.getProperty(RETRY_WAIT_MS_CONFIG_KEY, DEFAULT_RETRY_WAIT_MS);
_retryDelayScaleFactor = config.getProperty(RETRY_DELAY_SCALE_FACTOR_CONFIG_KEY, DEFAULT_RETRY_DELAY_SCALE_FACTOR);
_logger
.info("Initialized with retryCount: {}, retryWaitMs: {}, retryDelayScaleFactor: {}", _retryCount, _retryWaitMs,
_retryDelayScaleFactor);
}

@Override
protected void doInit(PinotConfiguration config) {
if (_httpClient == null) {
_httpClient = new FileUploadDownloadClient(HttpClientConfig.newBuilder(config).build());
}
}

@Override
Expand All @@ -87,9 +80,8 @@ public void fetchSegmentToLocal(URI downloadURI, File dest)
httpHeaders.add(new BasicHeader(HttpHeaders.HOST, hostName + ":" + port));
}
int statusCode = _httpClient.downloadFile(uri, dest, _authProvider, httpHeaders);
_logger
.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(),
statusCode);
_logger.info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest,
dest.length(), statusCode);
return true;
} catch (HttpErrorStatusException e) {
int statusCode = e.getStatusCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,19 @@
package org.apache.pinot.core.util;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.ListUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,93 +45,74 @@ public class PeerServerSegmentFinder {
private PeerServerSegmentFinder() {
}

private static final Logger _logger = LoggerFactory.getLogger(PeerServerSegmentFinder.class);
private static final Logger LOGGER = LoggerFactory.getLogger(PeerServerSegmentFinder.class);
private static final int MAX_NUM_ATTEMPTS = 5;
private static final int INITIAL_DELAY_MS = 500;
private static final double DELAY_SCALE_FACTOR = 2;

/**
*
* @param segmentName
* @param downloadScheme Can be either http or https.
* @param helixManager
* @return a list of uri strings of the form http(s)://hostname:port/segments/tablenameWithType/segmentName
* for the servers hosting ONLINE segments; empty list if no such server found.
* Returns a list of URIs of the form 'http(s)://hostname:port/segments/tableNameWithType/segmentName' for the servers
* hosting ONLINE segments; empty list if no such server found. The download scheme can be either 'http' or 'https'.
*/
public static List<URI> getPeerServerURIs(String segmentName, String downloadScheme, HelixManager helixManager) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
String tableNameWithType =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(llcSegmentName.getTableName());
return getPeerServerURIs(segmentName, downloadScheme, helixManager, tableNameWithType);
}

public static List<URI> getPeerServerURIs(String segmentName, String downloadScheme,
HelixManager helixManager, String tableNameWithType) {
public static List<URI> getPeerServerURIs(HelixManager helixManager, String tableNameWithType, String segmentName,
String downloadScheme) {
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
String clusterName = helixManager.getClusterName();
if (clusterName == null) {
_logger.error("ClusterName not found");
return ListUtils.EMPTY_LIST;
}
final List<URI> onlineServerURIs = new ArrayList<>();
List<URI> onlineServerURIs = new ArrayList<>();
try {
RetryPolicies.exponentialBackoffRetryPolicy(MAX_NUM_ATTEMPTS, INITIAL_DELAY_MS, DELAY_SCALE_FACTOR)
.attempt(() -> {
getOnlineServersFromExternalView(segmentName, downloadScheme, tableNameWithType, helixAdmin, clusterName,
getOnlineServersFromExternalView(helixAdmin, clusterName, tableNameWithType, segmentName, downloadScheme,
onlineServerURIs);
return !onlineServerURIs.isEmpty();
});
} catch (AttemptsExceededException e) {
LOGGER.error("Failed to find ONLINE servers for segment: {} in table: {} after {} attempts", segmentName,
tableNameWithType, MAX_NUM_ATTEMPTS);
} catch (Exception e) {
_logger.error("Failure in getting online servers for segment {}", segmentName, e);
LOGGER.error("Caught exception while getting peer server URIs for segment: {} in table: {}", segmentName,
tableNameWithType, e);
}
return onlineServerURIs;
}

private static void getOnlineServersFromExternalView(String segmentName, String downloadScheme,
String tableNameWithType, HelixAdmin helixAdmin, String clusterName, List<URI> onlineServerURIs) {
ExternalView externalViewForResource =
HelixHelper.getExternalViewForResource(helixAdmin, clusterName, tableNameWithType);
if (externalViewForResource == null) {
_logger.warn("External View not found for table {}", tableNameWithType);
private static void getOnlineServersFromExternalView(HelixAdmin helixAdmin, String clusterName,
String tableNameWithType, String segmentName, String downloadScheme, List<URI> onlineServerURIs)
throws Exception {
ExternalView externalView = helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
if (externalView == null) {
LOGGER.warn("Failed to find external view for table: {}", tableNameWithType);
return;
}
// Find out the ONLINE servers serving the segment.
Map<String, String> instanceToStateMap = externalViewForResource.getStateMap(segmentName);
for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) {
if ("ONLINE".equals(instanceState.getValue())) {
Map<String, String> instanceStateMap = externalView.getStateMap(segmentName);
if (instanceStateMap == null) {
LOGGER.warn("Failed to find segment: {} in table: {}", segmentName, tableNameWithType);
return;
}
for (Map.Entry<String, String> instanceState : instanceStateMap.entrySet()) {
if (SegmentStateModel.ONLINE.equals(instanceState.getValue())) {
String instanceId = instanceState.getKey();
_logger.info("Found ONLINE server {} for segment {}.", instanceId, segmentName);
LOGGER.info("Found ONLINE server: {} for segment: {} in table: {}", instanceId, segmentName, tableNameWithType);
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, instanceId);
String hostName = instanceConfig.getHostName();
int port = getServerAdminPort(helixAdmin, clusterName, instanceId, downloadScheme);
try {
onlineServerURIs.add(new URI(StringUtil
.join("/", downloadScheme + "://" + hostName + ":" + port, "segments", tableNameWithType, segmentName)));
} catch (URISyntaxException e) {
_logger.warn("Error in uri syntax: ", e);
}
String adminPortKey = getAdminPortKey(downloadScheme);
int port = instanceConfig.getRecord().getIntField(adminPortKey, Server.DEFAULT_ADMIN_API_PORT);
onlineServerURIs.add(new URI(
StringUtil.join("/", downloadScheme + "://" + hostName + ":" + port, "segments", tableNameWithType,
segmentName)));
}
}
}

private static int getServerAdminPort(HelixAdmin helixAdmin, String clusterName, String instanceId,
String downloadScheme) {
try {
return Integer.parseInt(HelixHelper.getInstanceConfigsMapFor(instanceId, clusterName, helixAdmin)
.get(getServerAdminPortKey(downloadScheme)));
} catch (Exception e) {
_logger.warn("Failed to retrieve ADMIN PORT for instanceId {} in the cluster {} ", instanceId, clusterName, e);
return CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
}
}

private static String getServerAdminPortKey(String downloadScheme) {
private static String getAdminPortKey(String downloadScheme) {
switch (downloadScheme) {
case CommonConstants.HTTPS_PROTOCOL:
return CommonConstants.Helix.Instance.ADMIN_HTTPS_PORT_KEY;
case CommonConstants.HTTP_PROTOCOL:
return Instance.ADMIN_PORT_KEY;
case CommonConstants.HTTPS_PROTOCOL:
return Instance.ADMIN_HTTPS_PORT_KEY;
default:
return CommonConstants.Helix.Instance.ADMIN_PORT_KEY;
throw new IllegalArgumentException("Unsupported download scheme: " + downloadScheme);
}
}
}
Loading

0 comments on commit ec452a4

Please sign in to comment.