From 939ae891355847047b2934252f0b0fbcb332208c Mon Sep 17 00:00:00 2001 From: BCaxelbecker <46450522+BCaxelbecker@users.noreply.github.com> Date: Fri, 19 Jul 2019 15:05:03 +0200 Subject: [PATCH] Feature/add acquire host list interval support (#16) * support for acquireHostListInterval * update version --- pom.xml | 4 ++-- .../scala/com/arangodb/spark/ArangoOptions.scala | 4 +++- src/main/scala/com/arangodb/spark/ReadOptions.scala | 6 +++++- src/main/scala/com/arangodb/spark/WriteOptions.scala | 6 +++++- src/main/scala/com/arangodb/spark/package.scala | 4 ++++ .../arangodb/spark/java/ArangoSparkJavaReadTest.java | 12 +++++++++++- 6 files changed, 30 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index c86651a..eca7ca6 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.arangodb arangodb-spark-connector_2.12 - 1.0.10-SNAPSHOT + 1.0.11-SNAPSHOT 2016 jar @@ -245,7 +245,7 @@ com.arangodb arangodb-java-driver - 5.0.6 + 5.0.7 com.arangodb diff --git a/src/main/scala/com/arangodb/spark/ArangoOptions.scala b/src/main/scala/com/arangodb/spark/ArangoOptions.scala index 8d2f73f..5b730a3 100644 --- a/src/main/scala/com/arangodb/spark/ArangoOptions.scala +++ b/src/main/scala/com/arangodb/spark/ArangoOptions.scala @@ -28,6 +28,8 @@ trait ArangoOptions { def acquireHostList: Option[Boolean] = None + def acquireHostListInterval: Option[Int] = None + def loadBalancingStrategy: Option[LoadBalancingStrategy] = None - + } \ No newline at end of file diff --git a/src/main/scala/com/arangodb/spark/ReadOptions.scala b/src/main/scala/com/arangodb/spark/ReadOptions.scala index c04793b..c38cfcd 100644 --- a/src/main/scala/com/arangodb/spark/ReadOptions.scala +++ b/src/main/scala/com/arangodb/spark/ReadOptions.scala @@ -43,6 +43,7 @@ case class ReadOptions(override val database: String = "_system", override val protocol: Option[Protocol] = None, override val maxConnections: Option[Int] = None, override val acquireHostList: Option[Boolean] = None, + override val acquireHostListInterval: Option[Int] = None, override val loadBalancingStrategy: Option[LoadBalancingStrategy] = None) extends ArangoOptions { def this() = this(database = "_system") @@ -71,6 +72,8 @@ case class ReadOptions(override val database: String = "_system", def acquireHostList(acquireHostList: Boolean): ReadOptions = copy(acquireHostList = Some(acquireHostList)) + def acquireHostListInterval(acquireHostListInterval: Int): ReadOptions = copy(acquireHostListInterval = Some(acquireHostListInterval)) + def loadBalancingStrategy(loadBalancingStrategy: LoadBalancingStrategy): ReadOptions = copy(loadBalancingStrategy = Some(loadBalancingStrategy)) def copy(database: String = database, @@ -86,8 +89,9 @@ case class ReadOptions(override val database: String = "_system", protocol: Option[Protocol] = protocol, maxConnections: Option[Int] = maxConnections, acquireHostList: Option[Boolean] = acquireHostList, + acquireHostListInterval: Option[Int] = acquireHostListInterval, loadBalancingStrategy: Option[LoadBalancingStrategy] = loadBalancingStrategy): ReadOptions = { - ReadOptions(database, collection, partitioner, hosts, user, password, useSsl, sslKeyStoreFile, sslPassPhrase, sslProtocol, protocol, maxConnections, acquireHostList, loadBalancingStrategy) + ReadOptions(database, collection, partitioner, hosts, user, password, useSsl, sslKeyStoreFile, sslPassPhrase, sslProtocol, protocol, maxConnections, acquireHostList, acquireHostListInterval, loadBalancingStrategy) } } diff --git a/src/main/scala/com/arangodb/spark/WriteOptions.scala b/src/main/scala/com/arangodb/spark/WriteOptions.scala index 8c8357d..40beea4 100644 --- a/src/main/scala/com/arangodb/spark/WriteOptions.scala +++ b/src/main/scala/com/arangodb/spark/WriteOptions.scala @@ -37,6 +37,7 @@ case class WriteOptions(override val database: String = "_system", override val protocol: Option[Protocol] = None, override val maxConnections: Option[Int] = None, override val acquireHostList: Option[Boolean] = None, + override val acquireHostListInterval: Option[Int] = None, override val loadBalancingStrategy: Option[LoadBalancingStrategy] = None) extends ArangoOptions { def this() = this(database = "_system") @@ -63,6 +64,8 @@ case class WriteOptions(override val database: String = "_system", def acquireHostList(acquireHostList: Boolean): WriteOptions = copy(acquireHostList = Some(acquireHostList)) + def acquireHostListInterval(acquireHostListInterval: Int): WriteOptions = copy(acquireHostListInterval = Some(acquireHostListInterval)) + def loadBalancingStrategy(loadBalancingStrategy: LoadBalancingStrategy): WriteOptions = copy(loadBalancingStrategy = Some(loadBalancingStrategy)) def copy(database: String = database, @@ -76,8 +79,9 @@ case class WriteOptions(override val database: String = "_system", protocol: Option[Protocol] = protocol, maxConnections: Option[Int] = maxConnections, acquireHostList: Option[Boolean] = acquireHostList, + acquireHostListInterval: Option[Int] = acquireHostListInterval, loadBalancingStrategy: Option[LoadBalancingStrategy] = loadBalancingStrategy): WriteOptions = { - WriteOptions(database, hosts, user, password, useSsl, sslKeyStoreFile, sslPassPhrase, sslProtocol, protocol, maxConnections, acquireHostList, loadBalancingStrategy) + WriteOptions(database, hosts, user, password, useSsl, sslKeyStoreFile, sslPassPhrase, sslProtocol, protocol, maxConnections, acquireHostList, acquireHostListInterval, loadBalancingStrategy) } } \ No newline at end of file diff --git a/src/main/scala/com/arangodb/spark/package.scala b/src/main/scala/com/arangodb/spark/package.scala index d7ffb28..1d0c12e 100644 --- a/src/main/scala/com/arangodb/spark/package.scala +++ b/src/main/scala/com/arangodb/spark/package.scala @@ -46,6 +46,7 @@ package object spark { val PropertyProtocol = "arangodb.protocol" val PropertyMaxConnections = "arangodb.maxConnections" val PropertyAcquireHostList = "arangodb.acquireHostList" + val PropertyAcquireHostListInterval = "arangodb.acquireHostListInterval" val PropertyLoadBalancingStrategy = "arangodb.loadBalancingStrategy" private[spark] def createReadOptions(options: ReadOptions, sc: SparkConf): ReadOptions = { @@ -60,6 +61,7 @@ package object spark { protocol = options.protocol.orElse(some(Protocol.valueOf(sc.get(PropertyProtocol, "VST")))), maxConnections = options.maxConnections.orElse(some(Try(sc.get(PropertyMaxConnections, null).toInt).getOrElse(1))), acquireHostList = options.acquireHostList.orElse(some(Try(sc.get(PropertyAcquireHostList, null).toBoolean).getOrElse(false))), + acquireHostListInterval = options.acquireHostListInterval.orElse(some(Try(sc.get(PropertyAcquireHostListInterval, null).toInt).getOrElse(60000))), loadBalancingStrategy = options.loadBalancingStrategy.orElse(some(LoadBalancingStrategy.valueOf(sc.get(PropertyLoadBalancingStrategy, "NONE"))))) } @@ -75,6 +77,7 @@ package object spark { protocol = options.protocol.orElse(some(Protocol.valueOf(sc.get(PropertyProtocol, "VST")))), maxConnections = options.maxConnections.orElse(some(Try(sc.get(PropertyMaxConnections, null).toInt).getOrElse(1))), acquireHostList = options.acquireHostList.orElse(some(Try(sc.get(PropertyAcquireHostList, null).toBoolean).getOrElse(false))), + acquireHostListInterval = options.acquireHostListInterval.orElse(some(Try(sc.get(PropertyAcquireHostListInterval, null).toInt).getOrElse(60000))), loadBalancingStrategy = options.loadBalancingStrategy.orElse(some(LoadBalancingStrategy.valueOf(sc.get(PropertyLoadBalancingStrategy, "NONE"))))) } @@ -91,6 +94,7 @@ package object spark { options.protocol.foreach { builder.useProtocol(_) } options.maxConnections.foreach { builder.maxConnections(_) } options.acquireHostList.foreach { builder.acquireHostList(_) } + options.acquireHostListInterval.foreach { builder.acquireHostListInterval(_) } options.loadBalancingStrategy.foreach { builder.loadBalancingStrategy(_) } builder } diff --git a/src/test/java/com/arangodb/spark/java/ArangoSparkJavaReadTest.java b/src/test/java/com/arangodb/spark/java/ArangoSparkJavaReadTest.java index a4775f8..419ccaa 100644 --- a/src/test/java/com/arangodb/spark/java/ArangoSparkJavaReadTest.java +++ b/src/test/java/com/arangodb/spark/java/ArangoSparkJavaReadTest.java @@ -90,7 +90,17 @@ public void loadAllWithHTTP() { public void loadAllWithLoadBalancing() { // set acquireHostList to false, due our tests are running inside a nested docker container. Settings this option to true will result in wrong ports beeing used. // So we need to set those settings explicitly inside: 'src/test/resources/arangodb.properties' file - ArangoJavaRDD rdd = ArangoSpark.load(sc, COLLECTION, new ReadOptions().user("root").password("test").database(DB).acquireHostList(false).loadBalancingStrategy(LoadBalancingStrategy.ROUND_ROBIN), TestJavaEntity.class); + ArangoJavaRDD rdd = ArangoSpark.load( + sc, + COLLECTION, + new ReadOptions() + .user("root") + .password("test") + .database(DB) + .acquireHostList(false) + .loadBalancingStrategy(LoadBalancingStrategy.ROUND_ROBIN), TestJavaEntity.class); + assertThat(rdd.count(), is(100L)); } + }