Skip to content

Commit

Permalink
Merge pull request #895 from openzipkin/cleanup-sampler
Browse files Browse the repository at this point in the history
Cleans up dead collector sampler config
  • Loading branch information
adriancole committed Jan 4, 2016
2 parents 1c3819c + b17abf5 commit 35952e8
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 226 deletions.
8 changes: 5 additions & 3 deletions zipkin-collector-service/config/collector-cassandra.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
* limitations under the License.
*/

import com.google.common.util.concurrent.Atomics
import com.twitter.app.App
import com.twitter.zipkin.builder.Builder
import com.twitter.zipkin.cassandra.CassandraSpanStoreFactory
import com.twitter.zipkin.collector.builder.{Adjustable, CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.collector.builder.{CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.receiver.kafka.KafkaSpanReceiverFactory
import com.twitter.zipkin.storage.{DependencyStore, SpanStore, Store}

val serverPort = sys.env.get("COLLECTOR_PORT").getOrElse("9410").toInt
val adminPort = sys.env.get("COLLECTOR_ADMIN_PORT").getOrElse("9900").toInt
val logLevel = sys.env.get("COLLECTOR_LOG_LEVEL").getOrElse("INFO")
val sampleRate = sys.env.get("COLLECTOR_SAMPLE_RATE").getOrElse("1.0").toDouble
val sampleRate = sys.env.get("COLLECTOR_SAMPLE_RATE").getOrElse("1.0").toFloat

object Factory extends App with CassandraSpanStoreFactory

Expand Down Expand Up @@ -56,5 +57,6 @@ CollectorServiceBuilder(
storeBuilder,
kafkaReceiver,
serverBuilder = ZipkinServerBuilder(serverPort, adminPort),
sampleRate = Atomics.newReference(sampleRate),
logLevel = logLevel
).sampleRate(Adjustable.local(sampleRate))
)
8 changes: 5 additions & 3 deletions zipkin-collector-service/config/collector-dev.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
* limitations under the License.
*/

import com.google.common.util.concurrent.Atomics
import com.twitter.zipkin.anormdb.{DependencyStoreBuilder, SpanStoreBuilder}
import com.twitter.zipkin.collector.builder.{Adjustable, CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.collector.builder.{CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.receiver.kafka.KafkaSpanReceiverFactory
import com.twitter.zipkin.storage.Store
import com.twitter.zipkin.storage.anormdb.DB

val serverPort = sys.env.get("COLLECTOR_PORT").getOrElse("9410").toInt
val adminPort = sys.env.get("COLLECTOR_ADMIN_PORT").getOrElse("9900").toInt
val logLevel = sys.env.get("COLLECTOR_LOG_LEVEL").getOrElse("INFO")
val sampleRate = sys.env.get("COLLECTOR_SAMPLE_RATE").getOrElse("1.0").toDouble
val sampleRate = sys.env.get("COLLECTOR_SAMPLE_RATE").getOrElse("1.0").toFloat

val db = DB()

Expand All @@ -36,5 +37,6 @@ CollectorServiceBuilder(
storeBuilder,
kafkaReceiver,
serverBuilder = ZipkinServerBuilder(serverPort, adminPort),
sampleRate = Atomics.newReference(sampleRate),
logLevel = logLevel
).sampleRate(Adjustable.local(sampleRate))
)
8 changes: 5 additions & 3 deletions zipkin-collector-service/config/collector-mysql.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import com.google.common.util.concurrent.Atomics
import com.twitter.zipkin.anormdb.{DependencyStoreBuilder, SpanStoreBuilder}
import com.twitter.zipkin.collector.builder.{Adjustable, CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.collector.builder.{CollectorServiceBuilder, ZipkinServerBuilder}
import com.twitter.zipkin.receiver.kafka.KafkaSpanReceiverFactory
import com.twitter.zipkin.storage.Store
import com.twitter.zipkin.storage.anormdb.{DB, DBConfig, DBParams}

val serverPort = sys.env.get("COLLECTOR_PORT").getOrElse("9410").toInt
val adminPort = sys.env.get("COLLECTOR_ADMIN_PORT").getOrElse("9900").toInt
val logLevel = sys.env.get("COLLECTOR_LOG_LEVEL").getOrElse("INFO")
val sampleRate = sys.env.get("COLLECTOR_SAMPLE_RATE").getOrElse("1.0").toDouble
val sampleRate = sys.env.get("COLLECTOR_SAMPLE_RATE").getOrElse("1.0").toFloat

val db = DB(DBConfig(
name = "mysql",
Expand All @@ -33,5 +34,6 @@ CollectorServiceBuilder(
storeBuilder,
kafkaReceiver,
serverBuilder = ZipkinServerBuilder(serverPort, adminPort),
sampleRate = Atomics.newReference(sampleRate),
logLevel = logLevel
).sampleRate(Adjustable.local(sampleRate))
)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ import ch.qos.logback.classic
import ch.qos.logback.classic.Level
import com.twitter.finagle.ThriftMux
import com.twitter.logging.Logger
import com.twitter.ostrich.admin.{RuntimeEnvironment, ServiceTracker}
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.zipkin.builder.Builder
import com.twitter.zipkin.collector.filter.{SamplerFilter, ServiceStatsFilter}
import com.twitter.zipkin.collector.sampler.AdjustableGlobalSampler
import com.twitter.zipkin.collector.{SpanReceiver, ZipkinCollector}
import com.twitter.zipkin.config.sampler.{AdaptiveSamplerConfig, AdjustableRateConfig}
import com.twitter.zipkin.receiver.scribe.ScribeReceiver
import com.twitter.zipkin.storage.Store
import com.twitter.zipkin.thriftscala._
import org.slf4j.LoggerFactory
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference

/**
* Immutable builder for ZipkinCollector
Expand All @@ -43,8 +43,7 @@ case class CollectorServiceBuilder[T](
storeBuilder: Builder[Store],
receiver: Option[SpanReceiver.Processor => SpanReceiver] = None,
scribeCategories: Set[String] = Set("zipkin"),
sampleRateBuilder: Builder[AdjustableRateConfig] = Adjustable.local(1.0),
adaptiveSamplerBuilder: Option[Builder[AdaptiveSamplerConfig]] = None,
sampleRate: AtomicReference[Float] = new AtomicReference[Float](1.0f),
serverBuilder: ZipkinServerBuilder = ZipkinServerBuilder(9410, 9900),
logLevel: String = "INFO"
) extends Builder[RuntimeEnvironment => ZipkinCollector] {
Expand All @@ -54,16 +53,13 @@ case class CollectorServiceBuilder[T](

val log = Logger.get()

def sampleRate(c: Builder[AdjustableRateConfig]): CollectorServiceBuilder[T] = copy(sampleRateBuilder = c)
def adaptiveSampler(b: Builder[AdaptiveSamplerConfig]) = copy(adaptiveSamplerBuilder = Some(b))

def apply() = (runtime: RuntimeEnvironment) => {
serverBuilder.apply().apply(runtime)

log.info("Building store: %s".format(storeBuilder.toString))
val store = storeBuilder.apply()

val sampler = new SamplerFilter(new AdjustableGlobalSampler(sampleRateBuilder()))
val sampler = new SamplerFilter(new AdjustableGlobalSampler(sampleRate))

import com.twitter.zipkin.conversions.thrift._

Expand All @@ -78,13 +74,6 @@ case class CollectorServiceBuilder[T](
// initialize any alternate receiver, such as kafka
val rcv = receiver.map(_(process))

adaptiveSamplerBuilder foreach { builder =>
val config = builder.apply()
val service = config.apply()
service.start()
ServiceTracker.register(service)
}

new ZipkinCollector(server, store, rcv)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
*/
package com.twitter.zipkin.collector.sampler

import com.twitter.zipkin.config.sampler.AdjustableRateConfig
import java.util.concurrent.atomic.AtomicReference

/**
* Get the rate of sample from an adjustable source.
*/
class AdjustableGlobalSampler(sampleRateConfig: AdjustableRateConfig) extends GlobalSampler {
class AdjustableGlobalSampler(sampleRate: AtomicReference[Float]) extends GlobalSampler {

/**
* True: process trace
* False: drop trace on the floor
*/
override def apply(traceId: Long) : Boolean = {
if (sample(traceId, sampleRateConfig.get)) {
if (sample(traceId, sampleRate.get)) {
SAMPLER_PASSED.incr
true
} else {
Expand All @@ -47,7 +47,7 @@ class AdjustableGlobalSampler(sampleRateConfig: AdjustableRateConfig) extends Gl
* In addition, math.abs(Long.MinValue) = Long.MinValue due to overflow,
* so we treat Long.MinValue as Long.MaxValue
*/
def sample(traceId: Long, sampleRate: Double) : Boolean = {
def sample(traceId: Long, sampleRate: Float) : Boolean = {
if (sampleRate == 1) {
true
} else {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@ package com.twitter.zipkin.collector.sampler
*
*/

import com.twitter.zipkin.config.sampler.AdjustableRateConfig
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import com.google.common.util.concurrent.Atomics
import org.scalatest.{FunSuite, Matchers}

class AdjustableGlobalSamplerSpec extends FunSuite with Matchers with MockitoSugar {
val adjustableConfig = mock[AdjustableRateConfig]
class AdjustableGlobalSamplerSpec extends FunSuite with Matchers {

test("keep 10% of traces") {
when(adjustableConfig.get) thenReturn 0.1

val sampler = new AdjustableGlobalSampler(adjustableConfig)
val sampler = new AdjustableGlobalSampler(Atomics.newReference(0.1f))

sampler(Long.MinValue) should be (false)
sampler(-1) should be (true)
Expand All @@ -38,9 +33,7 @@ class AdjustableGlobalSamplerSpec extends FunSuite with Matchers with MockitoSug
}

test("drop all traces") {
when(adjustableConfig.get) thenReturn 0

val sampler = new AdjustableGlobalSampler(adjustableConfig)
val sampler = new AdjustableGlobalSampler(Atomics.newReference(0f))

sampler(Long.MinValue) should be (false)
sampler(Long.MinValue + 1)
Expand All @@ -51,9 +44,7 @@ class AdjustableGlobalSamplerSpec extends FunSuite with Matchers with MockitoSug
}

test("keep all traces") {
when(adjustableConfig.get) thenReturn 1

val sampler = new AdjustableGlobalSampler(adjustableConfig)
val sampler = new AdjustableGlobalSampler(Atomics.newReference(1f))

sampler(Long.MinValue) should be (true)
-5000 to 5000 foreach { i =>
Expand Down
Loading

0 comments on commit 35952e8

Please sign in to comment.