Skip to content

Commit

Permalink
refactor: Remove unused class
Browse files Browse the repository at this point in the history
  • Loading branch information
tgodzik committed Jun 30, 2023
1 parent 408e993 commit ddd3e62
Showing 1 changed file with 0 additions and 35 deletions.
35 changes: 0 additions & 35 deletions frontend/src/main/scala/bloop/bsp/BspServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import java.nio.file.NoSuchFileException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.control.NonFatal

Expand All @@ -22,15 +21,11 @@ import bloop.logging.DebugFilter
import bloop.task.Task

import jsonrpc4s._
import monix.execution.Ack
import monix.execution.Cancelable
import monix.execution.CancelablePromise
import monix.execution.Scheduler
import monix.execution.atomic.Atomic
import monix.reactive.Observable
import monix.reactive.Observer
import monix.reactive.OverflowStrategy
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.BehaviorSubject

object BspServer {
Expand Down Expand Up @@ -216,34 +211,4 @@ object BspServer {
}
}

final class PumpOperator[A](pumpTarget: Observer.Sync[A], runningFuture: Cancelable)
extends Observable.Operator[A, A] {
def apply(out: Subscriber[A]): Subscriber[A] =
new Subscriber[A] { self =>
implicit val scheduler = out.scheduler
private[this] val isActive = Atomic(true)

def onNext(elem: A): Future[Ack] =
out.onNext(elem).syncOnContinue {
// Forward and ignore ack; safe because observer is sync
pumpTarget.onNext(elem)
()
}

def onComplete(): Unit = {
if (isActive.getAndSet(false))
out.onComplete()
}

def onError(ex: Throwable): Unit = {
if (isActive.getAndSet(false)) {
// Complete instead of forwarding error so that completeL finishes
out.onComplete()
runningFuture.cancel()
} else {
scheduler.reportFailure(ex)
}
}
}
}
}

0 comments on commit ddd3e62

Please sign in to comment.