From 67098fdd4a04793a1dc00d9b1f4e49d3e4e37dbd Mon Sep 17 00:00:00 2001 From: Philipp Dallig Date: Wed, 17 Apr 2024 04:46:43 +0200 Subject: [PATCH] [ZEPPELIN-6006] Remove command line applications when downloading applications (#4746) * Move Files with java * Use java to download external dependecies * Improve code after review * Correct Mirror-URL and compilation --- pom.xml | 2 + zeppelin-interpreter-integration/pom.xml | 7 + .../integration/FlinkIntegrationTest.java | 3 +- .../integration/SparkIntegrationTest.java | 2 +- .../SparkSubmitIntegrationTest.java | 16 +- .../integration/ZSessionIntegrationTest.java | 8 +- .../integration/ZeppelinFlinkClusterTest.java | 2 +- .../integration/ZeppelinSparkClusterTest.java | 5 +- zeppelin-test/pom.xml | 73 ++ .../apache/zeppelin/test/DownloadRequest.java | 77 +++ .../apache/zeppelin/test/DownloadUtils.java | 629 ++++++++++++++++++ .../zeppelin/test}/SemanticVersion.java | 2 +- .../zeppelin/test/DownloadUtilsTest.java | 80 +++ .../src/test/resources/log4j2.properties | 25 + zeppelin-zengine/pom.xml | 7 + .../integration/DownloadUtils.java | 197 ------ .../SparkInterpreterLauncherTest.java | 4 +- 17 files changed, 918 insertions(+), 221 deletions(-) create mode 100644 zeppelin-test/pom.xml create mode 100644 zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java create mode 100644 zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java rename {zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration => zeppelin-test/src/main/java/org/apache/zeppelin/test}/SemanticVersion.java (98%) create mode 100644 zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java create mode 100644 zeppelin-test/src/test/resources/log4j2.properties delete mode 100644 zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java diff --git a/pom.xml b/pom.xml index b9128bb3ce2..0d14167ef22 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ zeppelin-jupyter zeppelin-plugins zeppelin-distribution + zeppelin-test @@ -116,6 +117,7 @@ 1.7.35 1.2.25 + 2.23.1 0.13.0 0.62.2 2.8.9 diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml index c79306eb3ff..d0fc4683d52 100644 --- a/zeppelin-interpreter-integration/pom.xml +++ b/zeppelin-interpreter-integration/pom.xml @@ -99,6 +99,13 @@ test + + org.apache.zeppelin + zeppelin-test + ${project.version} + test + + org.junit.jupiter junit-jupiter-engine diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java index 0d8167ab785..f52d063ecc9 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.zeppelin.test.DownloadUtils; import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -32,9 +33,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; -import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index 579c0b1faa2..26dee4f8e44 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.maven.model.Model; import org.apache.maven.model.io.xpp3.MavenXpp3Reader; +import org.apache.zeppelin.test.DownloadUtils; import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -36,7 +37,6 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; -import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.codehaus.plexus.util.xml.pull.XmlPullParserException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java index e0af99abfee..0ae1bcc1dbc 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkSubmitIntegrationTest.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.zeppelin.test.DownloadUtils; import org.apache.zeppelin.interpreter.ExecutionContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -33,7 +34,6 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; -import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -62,11 +62,9 @@ public class SparkSubmitIntegrationTest { @BeforeAll public static void setUp() throws IOException { - String sparkVersion = "3.4.1"; - String hadoopVersion = "3"; - LOGGER.info("Testing Spark Version: " + sparkVersion); - LOGGER.info("Testing Hadoop Version: " + hadoopVersion); - sparkHome = DownloadUtils.downloadSpark(sparkVersion, hadoopVersion); + LOGGER.info("Testing Spark Version: " + DownloadUtils.DEFAULT_SPARK_VERSION); + LOGGER.info("Testing Hadoop Version: " + DownloadUtils.DEFAULT_SPARK_HADOOP_VERSION); + sparkHome = DownloadUtils.downloadSpark(); hadoopCluster = new MiniHadoopCluster(); hadoopCluster.start(); @@ -102,7 +100,7 @@ void testLocalMode() throws InterpreterException, YarnException { InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build(); InterpreterResult interpreterResult = sparkSubmitInterpreter.interpret("--master local --class org.apache.spark.examples.SparkPi --deploy-mode client " + - sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context); + sparkHome + "/examples/jars/spark-examples_2.12-" + DownloadUtils.DEFAULT_SPARK_VERSION + ".jar", context); assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code(), interpreterResult.toString()); // no yarn application launched @@ -126,7 +124,7 @@ void testYarnMode() throws InterpreterException, YarnException { sparkSubmitInterpreter.interpret("--master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi " + "--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " + "--conf spark.executor.memory=512m " + - sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context); + sparkHome + "/examples/jars/spark-examples_2.12-" + DownloadUtils.DEFAULT_SPARK_VERSION + ".jar", context); assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code(), interpreterResult.toString()); GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.FINISHED)); @@ -159,7 +157,7 @@ public void run() { sparkSubmitInterpreter.interpret("--master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi " + "--conf spark.app.name=" + yarnAppName + " --conf spark.driver.memory=512m " + "--conf spark.executor.memory=512m " + - sparkHome + "/examples/jars/spark-examples_2.12-3.4.1.jar", context); + sparkHome + "/examples/jars/spark-examples_2.12-" + DownloadUtils.DEFAULT_SPARK_VERSION + ".jar", context); assertEquals(InterpreterResult.Code.INCOMPLETE, interpreterResult.code(), interpreterResult.toString()); assertTrue(interpreterResult.toString().contains("Paragraph received a SIGTERM"), interpreterResult.toString()); } catch (InterpreterException e) { diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java index 68320512e62..a02b88eb374 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java @@ -18,13 +18,13 @@ package org.apache.zeppelin.integration; import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.test.DownloadUtils; import org.apache.zeppelin.client.ClientConfig; import org.apache.zeppelin.client.ExecuteResult; import org.apache.zeppelin.client.websocket.SimpleMessageHandler; import org.apache.zeppelin.client.Status; import org.apache.zeppelin.client.ZSession; import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.rest.AbstractTestRestApi; @@ -68,7 +68,7 @@ public static void setUp() throws Exception { zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000"); notebook = TestUtils.getInstance(Notebook.class); - sparkHome = DownloadUtils.downloadSpark("3.4.1", "3"); + sparkHome = DownloadUtils.downloadSpark(); flinkHome = DownloadUtils.downloadFlink("1.17.1", "2.12"); } @@ -189,7 +189,7 @@ void testZSession_Spark() throws Exception { assertEquals(Status.FINISHED, result.getStatus(), result.toString()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData().contains("3.4.1"), result.getResults().get(0).getData()); + assertTrue(result.getResults().get(0).getData().contains(DownloadUtils.DEFAULT_SPARK_VERSION), result.getResults().get(0).getData()); assertEquals(0, result.getJobUrls().size()); // pyspark @@ -258,7 +258,7 @@ void testZSession_Spark_Submit() throws Exception { assertEquals(Status.FINISHED, result.getStatus(), result.toString()); assertEquals(1, result.getResults().size()); assertEquals("TEXT", result.getResults().get(0).getType()); - assertTrue(result.getResults().get(0).getData().contains("3.4.1"), result.getResults().get(0).getData()); + assertTrue(result.getResults().get(0).getData().contains(DownloadUtils.DEFAULT_SPARK_VERSION), result.getResults().get(0).getData()); assertEquals(0, result.getJobUrls().size()); // pyspark diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java index c14e002650d..1a19bf4a683 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinFlinkClusterTest.java @@ -18,8 +18,8 @@ package org.apache.zeppelin.integration; import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.test.DownloadUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.rest.AbstractTestRestApi; diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java index 3366f58a8b2..7c0cb00dd70 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.integration; import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.test.DownloadUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.Input; @@ -29,9 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; -import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.rest.AbstractTestRestApi; @@ -39,9 +38,7 @@ import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.utils.TestUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/zeppelin-test/pom.xml b/zeppelin-test/pom.xml new file mode 100644 index 00000000000..8973ecc2a4b --- /dev/null +++ b/zeppelin-test/pom.xml @@ -0,0 +1,73 @@ + + + + + 4.0.0 + + org.apache.zeppelin + zeppelin + 0.12.0-SNAPSHOT + + zeppelin-test + Zeppelin: Test + Zeppelin test code used in other modules + + 0.9.5 + + jar + + + org.apache.commons + commons-compress + ${commons.compress.version} + + + commons-io + commons-io + + + me.tongfei + progressbar + ${progressbar.version} + + + org.slf4j + slf4j-api + + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j2.version} + test + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + diff --git a/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java new file mode 100644 index 00000000000..50126acd0b4 --- /dev/null +++ b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.test; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Optional; + +public class DownloadRequest { + private final URL url; + private final Optional alternativeUrl; + private final int retries; + + public static final int DEFAULT_RETRIES = 3; + + public DownloadRequest(String url, int retries) throws MalformedURLException { + this(url, null, retries); + } + + public DownloadRequest(String url, String alternativeUrl) throws MalformedURLException { + this(url, alternativeUrl, DEFAULT_RETRIES); + } + + public DownloadRequest(String url, String alternativeUrl, int retries) + throws MalformedURLException { + if (alternativeUrl != null) { + this.url = new URL(url); + this.alternativeUrl = Optional.of(new URL(alternativeUrl)); + this.retries = retries; + } else { + this.url = new URL(url); + this.alternativeUrl = Optional.empty(); + this.retries = retries; + } + } + + public DownloadRequest(URL url, int retries) { + this(url, null, retries); + } + + public DownloadRequest(URL url, URL alternativeUrl) { + this(url, alternativeUrl, DEFAULT_RETRIES); + } + + public DownloadRequest(URL url, URL alternativeUrl, int retries) { + this.url = url; + this.alternativeUrl = Optional.of(alternativeUrl); + this.retries = retries; + } + + public URL getUrl() { + return url; + } + + public Optional getAlternativeUrl() { + return alternativeUrl; + } + + public int getRetries() { + return retries; + } +} diff --git a/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java new file mode 100644 index 00000000000..37332b6c224 --- /dev/null +++ b/zeppelin-test/src/main/java/org/apache/zeppelin/test/DownloadUtils.java @@ -0,0 +1,629 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.test; + +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.ArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import me.tongfei.progressbar.DelegatingProgressBarConsumer; +import me.tongfei.progressbar.ProgressBar; +import me.tongfei.progressbar.ProgressBarBuilder; +import me.tongfei.progressbar.ProgressBarStyle; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.Optional; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +/** + * Utility class for downloading spark/flink/livy. This is used for spark/flink integration test. + */ +public class DownloadUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(DownloadUtils.class); + + public static final String APACHE_MIRROR_ENV_KEY = "APACHE_MIRROR"; + public static final String PROGRESS_BAR_UPDATE_INTERVAL_ENV_KEY = "PROGRESS_BAR_UPDATE_INTERVAL"; + + private static final String MIRROR_URL; + private static final String ARCHIVE_URL = "https://archive.apache.org/dist/"; + private static final int PROGRESS_BAR_UPDATE_INTERVAL; + + private static String downloadFolder = System.getProperty("user.home") + "/.cache"; + public static final String DEFAULT_SPARK_VERSION = "3.4.2"; + public static final String DEFAULT_SPARK_HADOOP_VERSION = "3"; + + + private DownloadUtils() { + throw new IllegalStateException("Utility class"); + } + + static { + try { + FileUtils.forceMkdir(new File(downloadFolder)); + String envUrl = System.getenv(APACHE_MIRROR_ENV_KEY); + if (StringUtils.isNotBlank(envUrl)) { + MIRROR_URL = envUrl; + } else { + MIRROR_URL = + IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"), + StandardCharsets.UTF_8); + } + String envProgressUpdateInterval = System.getenv(PROGRESS_BAR_UPDATE_INTERVAL_ENV_KEY); + if (StringUtils.isNotBlank(envProgressUpdateInterval)) { + PROGRESS_BAR_UPDATE_INTERVAL = Integer.valueOf(envProgressUpdateInterval); + } else { + PROGRESS_BAR_UPDATE_INTERVAL = 1000; + } + } catch (IOException e) { + throw new RuntimeException("Fail to create download folder: " + downloadFolder, e); + } + } + + /** + * Download Spark with default versions + * + * @return home of Spark installation + */ + public static String downloadSpark() { + return downloadSpark(DEFAULT_SPARK_VERSION, DEFAULT_SPARK_HADOOP_VERSION, null); + } + + /** + * Download of a Spark distribution with a specific Hadoop version + * + * @param sparkVersion + * @param hadoopVersion + * @return home of Spark installation + */ + public static String downloadSpark(String sparkVersion, String hadoopVersion) { + return downloadSpark(sparkVersion, hadoopVersion, null); + } + + /** + * Download of a Spark distribution with a Hadoop and Scala version + * + * @param sparkVersion + * @param hadoopVersion + * @param scalaVersion + * @return home of Spark installation + */ + public static String downloadSpark(String sparkVersion, String hadoopVersion, + String scalaVersion) { + File sparkFolder = new File(downloadFolder, "spark"); + final File targetSparkHomeFolder; + if (StringUtils.isNotBlank(scalaVersion)) { + targetSparkHomeFolder = new File(sparkFolder, + "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + "-scala" + scalaVersion); + } else { + targetSparkHomeFolder = new File(sparkFolder, + "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion); + } + + return downloadSpark(sparkVersion, hadoopVersion, scalaVersion, targetSparkHomeFolder); + } + + /** + * Download of a Spark distribution + * + * @param sparkVersion + * @param hadoopVersion + * @param targetSparkHomeFolder - where should the spark archive be extracted + * @return home of Spark installation + */ + public static String downloadSpark(String sparkVersion, String hadoopVersion, String scalaVersion, + File targetSparkHomeFolder) { + File sparkFolder = new File(downloadFolder, "spark"); + sparkFolder.mkdir(); + final String sparkVersionLog; + if (StringUtils.isBlank(scalaVersion)) { + sparkVersionLog = "Spark " + sparkVersion + "-" + hadoopVersion; + } else { + sparkVersionLog = "Spark " + sparkVersion + "-" + hadoopVersion + "-" + scalaVersion; + } + if (targetSparkHomeFolder.exists()) { + LOGGER.info("Skip to download {} as it is already downloaded.", sparkVersionLog); + return targetSparkHomeFolder.getAbsolutePath(); + } + final File sparkTarGZ; + if (StringUtils.isBlank(scalaVersion)) { + sparkTarGZ = + new File(sparkFolder, "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + ".tgz"); + } else { + sparkTarGZ = new File(sparkFolder, "spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + + "-scala" + scalaVersion + ".tgz"); + } + + try { + URL mirrorURL = new URL(MIRROR_URL + + generateSparkDownloadURL(sparkVersion, hadoopVersion, scalaVersion)); + URL archiveURL = new URL(ARCHIVE_URL + + generateSparkDownloadURL(sparkVersion, hadoopVersion, scalaVersion)); + LOGGER.info("Download {}", sparkVersionLog); + download(new DownloadRequest(mirrorURL, archiveURL), sparkTarGZ); + ProgressBarBuilder pbb = new ProgressBarBuilder() + .setTaskName("Unarchiv") + .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit + .setStyle(ProgressBarStyle.ASCII) + .setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL) + .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info)); + try ( + InputStream fis = Files.newInputStream(sparkTarGZ.toPath()); + InputStream pbis = ProgressBar.wrap(fis, pbb); + InputStream bis = new BufferedInputStream(pbis); + InputStream gzis = new GzipCompressorInputStream(bis); + ArchiveInputStream o = new TarArchiveInputStream(gzis)) { + LOGGER.info("Unarchive {} to {}", sparkVersionLog, targetSparkHomeFolder); + unarchive(o, targetSparkHomeFolder, 1); + LOGGER.info("Unarchive {} done", sparkVersionLog); + } + } catch (IOException e) { + throw new RuntimeException("Unable to download spark", e); + } + return targetSparkHomeFolder.getAbsolutePath(); + } + + + public static void download(String url, int retries, File dst) throws IOException { + download(new URL(url), retries, dst); + } + + public static void download(DownloadRequest downloadRequest, File dst) throws IOException { + if (dst.exists()) { + LOGGER.info("Skip Download of {}, because it exists", dst); + } else { + boolean urlDownload = download(downloadRequest.getUrl(), downloadRequest.getRetries(), dst); + if (urlDownload) { + LOGGER.info("Download successfully"); + return; + } + Optional alternativeURL = downloadRequest.getAlternativeUrl(); + if (alternativeURL.isPresent()) { + urlDownload = download(alternativeURL.get(), downloadRequest.getRetries(), dst); + if (urlDownload) { + LOGGER.info("Download from alternative successfully"); + return; + } + } + throw new IOException("Unable to download from " + downloadRequest.getUrl()); + } + } + + private static boolean download(URL url, int retries, File dst) { + int retry = 0; + while (retry < retries) { + try { + HttpURLConnection httpConnection = (HttpURLConnection) (url.openConnection()); + long completeFileSize = httpConnection.getContentLength(); + ProgressBarBuilder pbb = new ProgressBarBuilder() + .setTaskName("Download " + dst.getName()) + .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit + .setStyle(ProgressBarStyle.ASCII) + .setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL) + .setInitialMax(completeFileSize) + .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info)); + try ( + OutputStream fileOS = Files.newOutputStream(dst.toPath()); + InputStream is = url.openStream(); + InputStream pbis = ProgressBar.wrap(is, pbb); + InputStream bis = new BufferedInputStream(pbis)) { + IOUtils.copyLarge(bis, fileOS); + return true; + } + } catch (IOException e) { + LOGGER.info("Unable to download from {}", url, e); + ++retry; + } + } + return false; + } + + /** + * @param livyVersion + * @param targetLivyHomeFolder + * @return livyHome + */ + public static String downloadLivy(String livyVersion, String scalaVersion, + File targetLivyHomeFolder) { + File livyDownloadFolder = new File(downloadFolder, "livy"); + livyDownloadFolder.mkdir(); + final String livyLog = StringUtils.isBlank(scalaVersion) ? "Livy " + livyVersion : "Livy " + + livyVersion + "_" + scalaVersion; + if (targetLivyHomeFolder.exists()) { + LOGGER.info("Skip to download {} as it is already downloaded.", livyLog); + return targetLivyHomeFolder.getAbsolutePath(); + } + final File livyZip; + if (StringUtils.isBlank(scalaVersion)) { + // e.g. apache-livy-0.7.1-incubating-bin.zip + livyZip = new File(livyDownloadFolder, "apache-livy-" + livyVersion + "-bin.zip"); + } else { + // e.g apache-livy-0.8.0-incubating_2.12-bin.zip + livyZip = new File(livyDownloadFolder, "apache-livy-" + livyVersion + "_" + scalaVersion + "-bin.zip"); + } + + try { + URL mirrorURL = new URL(MIRROR_URL + generateLivyDownloadUrl(livyVersion, scalaVersion)); + URL archiveURL = new URL(ARCHIVE_URL + generateLivyDownloadUrl(livyVersion, scalaVersion)); + LOGGER.info("Download {}", livyLog); + download(new DownloadRequest(mirrorURL, archiveURL), livyZip); + LOGGER.info("Unzip {} to {}", livyLog, targetLivyHomeFolder); + ProgressBarBuilder pbb = new ProgressBarBuilder() + .setTaskName("Unarchiv Livy") + .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit + .setStyle(ProgressBarStyle.ASCII) + .setUpdateIntervalMillis(PROGRESS_BAR_UPDATE_INTERVAL) + .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info)); + try (InputStream fis = Files.newInputStream(livyZip.toPath()); + InputStream pbis = ProgressBar.wrap(fis, pbb); + InputStream bis = new BufferedInputStream(pbis); + ZipInputStream zis = new ZipInputStream(bis)) { + unzip(zis, targetLivyHomeFolder, 1); + } + LOGGER.info("Unzip {} done", livyLog); + // Create logs directory + File logs = new File(targetLivyHomeFolder, "logs"); + logs.mkdir(); + } catch (MalformedURLException e) { + LOGGER.error("invalid URL", e); + } catch (IOException e) { + throw new RuntimeException("Unable to download livy", e); + } + return targetLivyHomeFolder.getAbsolutePath(); + } + + /** + * @param livyVersion + * @return return livyHome + * @throws IOException + */ + public static String downloadLivy(String livyVersion) { + File livyDownloadFolder = new File(downloadFolder, "livy"); + File targetLivyHomeFolder = new File(livyDownloadFolder, "livy-" + livyVersion); + return downloadLivy(livyVersion, null, targetLivyHomeFolder); + } + + public static String downloadLivy(String livyVersion, String scalaVersion) { + File livyDownloadFolder = new File(downloadFolder, "livy"); + File targetLivyHomeFolder = + new File(livyDownloadFolder, "livy-" + livyVersion + "_" + scalaVersion); + return downloadLivy(livyVersion, scalaVersion, targetLivyHomeFolder); + } + + private static File newFile(File destinationDir, ZipEntry zipEntry, int strip) + throws IOException { + String filename = zipEntry.getName(); + for (int i = 0; i < strip; ++i) { + if (filename.contains(File.separator)) { + filename = filename.substring(filename.indexOf(File.separator) + 1); + } + } + File destFile = new File(destinationDir, filename); + String destDirPath = destinationDir.getCanonicalPath(); + String destFilePath = destFile.getCanonicalPath(); + + if (!destFilePath.startsWith(destDirPath + File.separator)) { + throw new IOException("Entry is outside of the target dir: " + zipEntry.getName()); + } + + return destFile; + } + + private static File newFile(File destDir, ArchiveEntry archiveEntry, int strip) + throws IOException { + String filename = archiveEntry.getName(); + for (int i = 0; i < strip; ++i) { + if (filename.contains(File.separator)) { + filename = filename.substring(filename.indexOf(File.separator) + 1); + } + } + File destFile = new File(destDir, filename); + String destDirPath = destDir.getCanonicalPath(); + String destFilePath = destFile.getCanonicalPath(); + + if (!destFilePath.startsWith(destDirPath + File.separator)) { + throw new IOException("Entry is outside of the target dir: " + archiveEntry.getName()); + } + + return destFile; + } + + private static void unarchive(ArchiveInputStream ais, File destDir, + int strip) throws IOException { + byte[] buffer = new byte[1024]; + ArchiveEntry archiveEntry = ais.getNextEntry(); + while (archiveEntry != null) { + File newFile; + try { + newFile = newFile(destDir, archiveEntry, strip); + } catch (IOException e) { + LOGGER.info("Skip {}", archiveEntry.getName()); + archiveEntry = ais.getNextEntry(); + continue; + } + if (archiveEntry.isDirectory()) { + if (!newFile.isDirectory() && !newFile.mkdirs()) { + throw new IOException("Failed to create directory " + newFile); + } + } else { + File parent = newFile.getParentFile(); + if (!parent.isDirectory() && !parent.mkdirs()) { + throw new IOException("Failed to create directory " + parent); + } + + // write file content + try (FileOutputStream fos = new FileOutputStream(newFile)) { + int len; + while ((len = ais.read(buffer)) > 0) { + fos.write(buffer, 0, len); + } + } + // Change permissions and metadata + if (newFile.getParentFile().getName().contains("bin") + && !newFile.setExecutable(true, false)) { + LOGGER.info("Setting file {} to executable failed", newFile); + } + if (!newFile.setLastModified(archiveEntry.getLastModifiedDate().getTime())) { + LOGGER.info("Setting last modified date to file {} failed", newFile); + } + } + archiveEntry = ais.getNextEntry(); + } + } + + private static void unzip(ZipInputStream zis, File destDir, int strip) throws IOException { + byte[] buffer = new byte[1024]; + ZipEntry zipEntry = zis.getNextEntry(); + while (zipEntry != null) { + File newFile; + try { + newFile = newFile(destDir, zipEntry, strip); + } catch (IOException e) { + LOGGER.info("Skip {}", zipEntry.getName()); + zipEntry = zis.getNextEntry(); + continue; + } + if (zipEntry.isDirectory()) { + if (!newFile.isDirectory() && !newFile.mkdirs()) { + throw new IOException("Failed to create directory " + newFile); + } + } else { + File parent = newFile.getParentFile(); + if (!parent.isDirectory() && !parent.mkdirs()) { + throw new IOException("Failed to create directory " + parent); + } + + // write file content + try (FileOutputStream fos = new FileOutputStream(newFile)) { + int len; + while ((len = zis.read(buffer)) > 0) { + fos.write(buffer, 0, len); + } + } + // Change permissions and metadata + if (newFile.getParentFile().getName().contains("bin") + && !newFile.setExecutable(true, false)) { + LOGGER.info("Setting file {} to executable failed", newFile); + } + if (!newFile.setLastModified(zipEntry.getLastModifiedTime().toMillis())) { + LOGGER.info("Setting last modified date to file {} failed", newFile); + } + } + zipEntry = zis.getNextEntry(); + } + zis.closeEntry(); + } + + public static String downloadFlink(String flinkVersion, String scalaVersion) { + File flinkDownloadFolder = new File(downloadFolder, "flink"); + flinkDownloadFolder.mkdir(); + File targetFlinkHomeFolder = new File(flinkDownloadFolder, "flink-" + flinkVersion); + if (targetFlinkHomeFolder.exists()) { + LOGGER.info("Skip to download Flink {}_{} as it is already downloaded.", flinkVersion, + scalaVersion); + return targetFlinkHomeFolder.getAbsolutePath(); + } + File flinkTGZ = new File(flinkDownloadFolder, + "flink-" + flinkVersion + "-bin-scala_" + scalaVersion + ".tgz"); + try { + URL mirrorURL = new URL(MIRROR_URL + generateDownloadURL( + "flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz", "flink")); + URL archiveURL = new URL(ARCHIVE_URL + generateDownloadURL( + "flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz", "flink")); + LOGGER.info("Download Flink {}_{}", flinkVersion, scalaVersion); + download(new DownloadRequest(mirrorURL, archiveURL), flinkTGZ); + ProgressBarBuilder pbb = new ProgressBarBuilder() + .setTaskName("Unarchiv Flink") + .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit + .setStyle(ProgressBarStyle.ASCII) + .setUpdateIntervalMillis(1000) + .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info)); + try ( + InputStream fis = Files.newInputStream(flinkTGZ.toPath()); + InputStream pbis = ProgressBar.wrap(fis, pbb); + InputStream bis = new BufferedInputStream(pbis); + InputStream gzis = new GzipCompressorInputStream(bis); + ArchiveInputStream o = new TarArchiveInputStream(gzis)) { + LOGGER.info("Unarchive Flink {}_{} to {}", flinkVersion, scalaVersion, + targetFlinkHomeFolder); + unarchive(o, targetFlinkHomeFolder, 1); + LOGGER.info("Unarchive Flink done"); + } + } catch (IOException e) { + throw new RuntimeException("Unable to download flink", e); + } + + + // download other dependencies for running flink with yarn and hive + try { + download("https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_" + + scalaVersion + "/" + + flinkVersion + "/flink-connector-hive_" + scalaVersion + "-" + flinkVersion + ".jar", + 3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-connector-hive_" + + scalaVersion + "-" + flinkVersion + ".jar")); + download("https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_" + scalaVersion + "/" + + flinkVersion + "/flink-hadoop-compatibility_" + scalaVersion + "-" + flinkVersion + ".jar", + 3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-hadoop-compatibility_" + + scalaVersion + "-" + flinkVersion + ".jar")); + download("https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.7/hive-exec-2.3.7.jar", + 3, new File(targetFlinkHomeFolder, "lib" + File.separator + "hive-exec-2.3.4.jar")); + download( + "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.6/hadoop-client-api-3.3.6.jar", + 3, new File(targetFlinkHomeFolder, + "lib" + File.separator + "hadoop-client-api-3.3.6.jar")); + download( + "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.6/hadoop-client-runtime-3.3.6.jar", + 3, new File(targetFlinkHomeFolder, + "lib" + File.separator + "hadoop-client-runtime-3.3.6.jar")); + download("https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala_" + + scalaVersion + "/" + + flinkVersion + "/flink-table-api-scala_" + scalaVersion + "-" + flinkVersion + ".jar", + 3, new File(targetFlinkHomeFolder, "lib" + File.separator + "flink-table-api-scala_" + + scalaVersion + "-" + flinkVersion + ".jar")); + download("https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_" + + scalaVersion + "/" + + flinkVersion + "/flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar", + 3, new File(targetFlinkHomeFolder, "lib" + File.separator + + "flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar")); + + String jarName = "flink-table-planner_" + scalaVersion + "-" + flinkVersion + ".jar"; + mvFile( + targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName, + targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName); + jarName = "flink-table-planner-loader-" + flinkVersion + ".jar"; + mvFile( + targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName, + targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName); + if (SemanticVersion.of(flinkVersion).equalsOrNewerThan(SemanticVersion.of("1.16.0"))) { + jarName = "flink-sql-client-" + flinkVersion + ".jar"; + mvFile(targetFlinkHomeFolder + File.separator + "opt" + File.separator + jarName, + targetFlinkHomeFolder + File.separator + "lib" + File.separator + jarName); + } + } catch (Exception e) { + throw new RuntimeException("Fail to download jar", e); + } + return targetFlinkHomeFolder.getAbsolutePath(); + } + + private static void mvFile(String srcPath, String dstPath) throws IOException { + Path src = Paths.get(srcPath); + Path dst = Paths.get(dstPath); + if (src.toFile().exists()) { + if (dst.toFile().exists()) { + LOGGER.warn("{} does exits - replacing", dstPath); + FileUtils.deleteQuietly(dst.toFile()); + } + LOGGER.info("Copy file {} to {}", src, dst); + Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING); + } else { + LOGGER.warn("{} does not exits - skipping", srcPath); + } + } + + public static String downloadHadoop(String version) { + File hadoopDownloadFolder = new File(downloadFolder, "hadoop"); + hadoopDownloadFolder.mkdir(); + File targetHadoopHomeFolder = new File(hadoopDownloadFolder, "hadoop-" + version); + if (targetHadoopHomeFolder.exists()) { + LOGGER.info("Skip to download Hadoop {} as it is already downloaded.", version); + return targetHadoopHomeFolder.getAbsolutePath(); + } + File hadoopTGZ = new File(hadoopDownloadFolder, "hadoop-" + version + ".tar.gz"); + try { + URL mirrorURL = new URL(MIRROR_URL + generateDownloadURL( + "hadoop", version, ".tar.gz", "hadoop/core")); + URL archiveURL = new URL(ARCHIVE_URL + generateDownloadURL( + "hadoop", version, ".tar.gz", "hadoop/core")); + LOGGER.info("Download Hadoop {}", version); + download(new DownloadRequest(mirrorURL, archiveURL), hadoopTGZ); + ProgressBarBuilder pbb = new ProgressBarBuilder() + .setTaskName("Unarchiv") + .setUnit("MiB", 1048576) // setting the progress bar to use MiB as the unit + .setStyle(ProgressBarStyle.ASCII) + .setUpdateIntervalMillis(1000) + .setConsumer(new DelegatingProgressBarConsumer(LOGGER::info)); + try ( + InputStream fis = Files.newInputStream(hadoopTGZ.toPath()); + InputStream pbis = ProgressBar.wrap(fis, pbb); + InputStream bis = new BufferedInputStream(pbis); + InputStream gzis = new GzipCompressorInputStream(bis); + ArchiveInputStream o = new TarArchiveInputStream(gzis)) { + LOGGER.info("Unarchive Hadoop {} to {}", version, targetHadoopHomeFolder); + unarchive(o, targetHadoopHomeFolder, 1); + LOGGER.info("Unarchive Hadoop {} done", version); + } + } catch (IOException e) { + throw new RuntimeException("Unable to download hadoop"); + } + return targetHadoopHomeFolder.getAbsolutePath(); + } + + private static String generateDownloadURL(String project, String version, String postFix, + String projectPath) { + return projectPath + "/" + project + "-" + version + "/" + project + "-" + version + + postFix; + } + + private static String generateSparkDownloadURL(String sparkVersion, String hadoopVersion, + String scalaVersion) { + final String url; + String sparkVersionFolder = "spark/spark-" + sparkVersion; + if (StringUtils.isNotBlank(hadoopVersion)) { + if (StringUtils.isNotBlank(scalaVersion)) { + // spark-3.4.0-bin-hadoop3-scala2.13.tgz + url = sparkVersionFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + + "-scala" + scalaVersion + ".tgz"; + } else { + url = + sparkVersionFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion + ".tgz"; + } + } else { + url = sparkVersionFolder + "/spark-" + sparkVersion + "-bin-without-hadoop.tgz"; + } + return url; + } + + private static String generateLivyDownloadUrl(String livyVersion, String scalaVersion) { + SemanticVersion livy = SemanticVersion.of(livyVersion.replace("incubating", "")); + if (livy.equalsOrNewerThan(SemanticVersion.of("0.8.0"))) { + return "incubator/livy/" + livyVersion + "/apache-livy-" + livyVersion + "_" + scalaVersion + + "-bin.zip"; + } + return "incubator/livy/" + livyVersion + "/apache-livy-" + livyVersion + "-bin.zip"; + } +} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java b/zeppelin-test/src/main/java/org/apache/zeppelin/test/SemanticVersion.java similarity index 98% rename from zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java rename to zeppelin-test/src/main/java/org/apache/zeppelin/test/SemanticVersion.java index f9bd771005d..f25e50353ee 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/SemanticVersion.java +++ b/zeppelin-test/src/main/java/org/apache/zeppelin/test/SemanticVersion.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.zeppelin.interpreter.integration; +package org.apache.zeppelin.test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java b/zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java new file mode 100644 index 00000000000..6bed71683c8 --- /dev/null +++ b/zeppelin-test/src/test/java/org/apache/zeppelin/test/DownloadUtilsTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + + +@Disabled("Takes a long time and depends on external factors.") +class DownloadUtilsTest { + + @Test + void downloadHadoop() { + String hadoopHome = DownloadUtils.downloadHadoop("3.4.0"); + Path hadoopHomePath = Paths.get(hadoopHome); + assertTrue(hadoopHomePath.toFile().exists()); + assertTrue(hadoopHomePath.toFile().isDirectory()); + } + + @Test + void downloadSpark() { + String sparkHome = DownloadUtils.downloadSpark(); + Path sparkHomePath = Paths.get(sparkHome); + assertTrue(sparkHomePath.toFile().exists()); + assertTrue(sparkHomePath.toFile().isDirectory()); + } + + @Test + void downloadSparkWithScala() { + String sparkHome = DownloadUtils.downloadSpark(DownloadUtils.DEFAULT_SPARK_VERSION, DownloadUtils.DEFAULT_SPARK_HADOOP_VERSION, "2.13"); + Path sparkHomePath = Paths.get(sparkHome); + assertTrue(sparkHomePath.toFile().exists()); + assertTrue(sparkHomePath.toFile().isDirectory()); + } + + @Test + void downloadFlink() { + String sparkHome = DownloadUtils.downloadFlink("1.16.3", "2.12"); + Path sparkHomePath = Paths.get(sparkHome); + assertTrue(sparkHomePath.toFile().exists()); + assertTrue(sparkHomePath.toFile().isDirectory()); + } + + @Test + void downloadLivy() { + String sparkHome = DownloadUtils.downloadLivy("0.7.1-incubating"); + Path sparkHomePath = Paths.get(sparkHome); + assertTrue(sparkHomePath.toFile().exists()); + assertTrue(sparkHomePath.toFile().isDirectory()); + } + + @Test + void downloadLivy080() { + String sparkHome = DownloadUtils.downloadLivy("0.8.0-incubating", "2.12"); + Path sparkHomePath = Paths.get(sparkHome); + assertTrue(sparkHomePath.toFile().exists()); + assertTrue(sparkHomePath.toFile().isDirectory()); + } + +} diff --git a/zeppelin-test/src/test/resources/log4j2.properties b/zeppelin-test/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..4935e68e004 --- /dev/null +++ b/zeppelin-test/src/test/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger=debug, STDOUT + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %5p [%d] ({%t} %F[%M]:%L) - %m%n + diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index cabe6442e1b..f3def93c889 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -267,6 +267,13 @@ test + + org.apache.zeppelin + zeppelin-test + ${project.version} + test + + org.apache.commons commons-lang3 diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java deleted file mode 100644 index 6310c1d60a2..00000000000 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.integration; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URL; -import java.nio.charset.StandardCharsets; - -/** - * Utility class for downloading spark/flink. This is used for spark/flink integration test. - */ -public class DownloadUtils { - private static Logger LOGGER = LoggerFactory.getLogger(DownloadUtils.class); - - private static String downloadFolder = System.getProperty("user.home") + "/.cache"; - - static { - try { - FileUtils.forceMkdir(new File(downloadFolder)); - } catch (IOException e) { - throw new RuntimeException("Fail to create download folder: " + downloadFolder, e); - } - } - - public static String downloadSpark(String sparkVersion, String hadoopVersion) { - String sparkDownloadFolder = downloadFolder + "/spark"; - File targetSparkHomeFolder = - new File(sparkDownloadFolder + "/spark-" + sparkVersion + "-bin-hadoop" + hadoopVersion); - if (targetSparkHomeFolder.exists()) { - LOGGER.info("Skip to download spark as it is already downloaded."); - return targetSparkHomeFolder.getAbsolutePath(); - } - download("spark", sparkVersion, "-bin-hadoop" + hadoopVersion + ".tgz"); - return targetSparkHomeFolder.getAbsolutePath(); - } - - public static String downloadFlink(String flinkVersion, String scalaVersion) { - String flinkDownloadFolder = downloadFolder + "/flink"; - File targetFlinkHomeFolder = new File(flinkDownloadFolder + "/flink-" + flinkVersion); - if (targetFlinkHomeFolder.exists()) { - LOGGER.info("Skip to download flink as it is already downloaded."); - return targetFlinkHomeFolder.getAbsolutePath(); - } - download("flink", flinkVersion, "-bin-scala_" + scalaVersion + ".tgz"); - // download other dependencies for running flink with yarn and hive - try { - runShellCommand(new String[]{"wget", - "https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_" + scalaVersion + "/" - + flinkVersion + "/flink-connector-hive_" + scalaVersion + "-" + flinkVersion + ".jar", - "-P", targetFlinkHomeFolder + "/lib"}); - runShellCommand(new String[]{"wget", - "https://repo1.maven.org/maven2/org/apache/flink/flink-hadoop-compatibility_" + scalaVersion + "/" - + flinkVersion + "/flink-hadoop-compatibility_" + scalaVersion + "-" + flinkVersion + ".jar", - "-P", targetFlinkHomeFolder + "/lib"}); - runShellCommand(new String[]{"wget", - "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.7/hive-exec-2.3.7.jar", - "-P", targetFlinkHomeFolder + "/lib"}); - runShellCommand(new String[]{"wget", - "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.6/hadoop-client-api-3.3.6.jar", - "-P", targetFlinkHomeFolder + "/lib"}); - runShellCommand(new String[]{"wget", - "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.6/hadoop-client-runtime-3.3.6.jar", - "-P", targetFlinkHomeFolder + "/lib"}); - runShellCommand(new String[]{"wget", - "https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala_" + scalaVersion + "/" - + flinkVersion + "/flink-table-api-scala_" + scalaVersion + "-" + flinkVersion + ".jar", - "-P", targetFlinkHomeFolder + "/lib"}); - runShellCommand(new String[]{"wget", - "https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_" + scalaVersion + "/" - + flinkVersion + "/flink-table-api-scala-bridge_" + scalaVersion + "-" + flinkVersion + ".jar", - "-P", targetFlinkHomeFolder + "/lib"}); - runShellCommand(new String[]{"mv", - targetFlinkHomeFolder + "/opt/" + "flink-table-planner_" + scalaVersion + "-" + flinkVersion + ".jar", - targetFlinkHomeFolder + "/lib"}); - runShellCommand(new String[]{"mv", - targetFlinkHomeFolder + "/lib/" + "flink-table-planner-loader-" + flinkVersion + ".jar", - targetFlinkHomeFolder + "/opt"}); - if (SemanticVersion.of(flinkVersion).equalsOrNewerThan(SemanticVersion.of("1.16.0"))) { - runShellCommand(new String[]{"mv", - targetFlinkHomeFolder + "/opt/" + "flink-sql-client-" + flinkVersion + ".jar", - targetFlinkHomeFolder + "/lib"}); - } - } catch (Exception e) { - throw new RuntimeException("Fail to download jar", e); - } - return targetFlinkHomeFolder.getAbsolutePath(); - } - - public static String downloadHadoop(String version) { - String hadoopDownloadFolder = downloadFolder + "/hadoop"; - File targetHadoopHomeFolder = new File(hadoopDownloadFolder + "/hadoop-" + version); - if (targetHadoopHomeFolder.exists()) { - LOGGER.info("Skip to download hadoop as it is already downloaded."); - return targetHadoopHomeFolder.getAbsolutePath(); - } - download("hadoop", version, ".tar.gz", "hadoop/core"); - return targetHadoopHomeFolder.getAbsolutePath(); - } - - // Try mirrors first, if fails fallback to apache archive - private static void download(String project, String version, String postFix, String projectPath) { - String projectDownloadFolder = downloadFolder + "/" + project; - try { - String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true"), StandardCharsets.UTF_8); - File downloadFile = new File(projectDownloadFolder + "/" + project + "-" + version + postFix); - String downloadURL = preferredMirror + "/" + projectPath + "/" + project + "-" + version + "/" + project + "-" + version + postFix; - runShellCommand(new String[]{"wget", downloadURL, "-P", projectDownloadFolder}); - runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", projectDownloadFolder}); - } catch (Exception e) { - LOGGER.warn("Failed to download " + project + " from mirror site, fallback to use apache archive", e); - File downloadFile = new File(projectDownloadFolder + "/" + project + "-" + version + postFix); - String downloadURL = - "https://archive.apache.org/dist/" + projectPath + "/" + project +"-" - + version - + "/" + project + "-" - + version - + postFix; - try { - runShellCommand(new String[]{"wget", downloadURL, "-P", projectDownloadFolder}); - runShellCommand( - new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", projectDownloadFolder}); - } catch (Exception ex) { - throw new RuntimeException("Fail to download " + project + " " + version, ex); - } - } - } - - private static void download(String project, String version, String postFix) { - download(project, version, postFix, project); - } - - private static void runShellCommand(String[] commands) throws IOException, InterruptedException { - LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " ")); - Process process = Runtime.getRuntime().exec(commands); - StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream()); - StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream()); - errorGobbler.start(); - outputGobbler.start(); - if (process.waitFor() != 0) { - throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " ")); - } - LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " ")); - } - - private static class StreamGobbler extends Thread { - InputStream is; - - // reads everything from is until empty. - StreamGobbler(InputStream is) { - this.is = is; - } - - @Override - public void run() { - try { - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - String line = null; - long startTime = System.currentTimeMillis(); - while ((line = br.readLine()) != null) { - // logging per 5 seconds - if ((System.currentTimeMillis() - startTime) > 5000) { - LOGGER.info(line); - startTime = System.currentTimeMillis(); - } - } - } catch (IOException ioe) { - LOGGER.warn("Fail to print shell output", ioe); - } - } - } -} diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index e8d08e93219..7d778233fc3 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -19,9 +19,9 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.test.DownloadUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.integration.DownloadUtils; import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess; import org.apache.zeppelin.util.Util; import org.junit.jupiter.api.BeforeEach; @@ -54,7 +54,7 @@ public void setUp() { System.clearProperty(confVar.getVarName()); } - sparkHome = DownloadUtils.downloadSpark("3.4.1", "3"); + sparkHome = DownloadUtils.downloadSpark(); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("..").getAbsolutePath());