Skip to content

Commit

Permalink
Merge pull request #24 from EmmanuelMess/fix-working
Browse files Browse the repository at this point in the history
Replaced priorities with yield() and pipelined
  • Loading branch information
EmmanuelMess authored Dec 9, 2018
2 parents d6bcf34 + 0a496fc commit 37a9cec
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 113 deletions.
55 changes: 28 additions & 27 deletions lib/src/main/java/com/amaze/filepreloaderlibrary/Loader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.amaze.filepreloaderlibrary.utils.*
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.withLock

Expand All @@ -31,67 +32,67 @@ internal class Loader<D: DataContainer>(private val clazz: Class<D>) {
* on each file (represented by its path) inside the folder.
*/
internal fun loadFrom(unit: ProcessUnit<D>) {
GlobalScope.launch {
var somethingAddedToPreload = false
val file = KFile(unit.path)
val file = KFile(unit.path)

processor.workHighPriority(GlobalScope.produce {
//Load current folder
getPreloadMapMutex(clazz).withLock {
if (getPreloadMap(clazz)[file.path] == null) {
val subfiles: Array<String> = file.list() ?: arrayOf()
for (filename in subfiles) {
addToProcess(file.path, ProcessUnit(file.path + DIVIDER + filename, unit.fetcherFunction), PRIORITY_NOW)
}

getPreloadMap(clazz)[file.path] = PreloadedFolder(subfiles.size)
if (getPreloadMap(clazz).size > PRELOADED_MAP_MAXIMUM) cleanOldEntries()
getDeleteQueue(clazz).add(file.path)

somethingAddedToPreload = somethingAddedToPreload || subfiles.isNotEmpty()

for (filename in subfiles) {
send(toPreloadable(file.path, ProcessUnit(file.path + DIVIDER + filename, unit.fetcherFunction), PRIORITY_NOW))
}
}
}

close()
})

processor.workLowPriority(GlobalScope.produce {
//Load children folders
file.listDirectoriesToList()?.forEach {
getPreloadMapMutex(clazz).withLock {
val currentPath = unit.path + DIVIDER + it

if (getPreloadMap(clazz)[currentPath] == null) {
val subfiles: Array<String> = KFile(currentPath).list() ?: arrayOf()
for (filename in subfiles) {
addToProcess(currentPath, ProcessUnit(currentPath + DIVIDER + filename, unit.fetcherFunction), PRIORITY_FUTURE)
}

getPreloadMap(clazz)[currentPath] = PreloadedFolder(subfiles.size)
if (getPreloadMap(clazz).size > PRELOADED_MAP_MAXIMUM) cleanOldEntries()
getDeleteQueue(clazz).add(currentPath)

somethingAddedToPreload = somethingAddedToPreload || subfiles.isNotEmpty()
for (filename in subfiles) {
send(toPreloadable(currentPath, ProcessUnit(currentPath + DIVIDER + filename, unit.fetcherFunction), PRIORITY_FUTURE))
}
}
}
}


//Load parent folder
getPreloadMapMutex(clazz).withLock {
val parentPath = file.parent
if (parentPath != null && getPreloadMap(clazz)[parentPath] == null) {
val subfiles: Array<String> = KFile(parentPath).list() ?: arrayOf()
subfiles.forEach {
addToProcess(parentPath, ProcessUnit(parentPath + DIVIDER + it, unit.fetcherFunction), PRIORITY_FUTURE)
}

getPreloadMap(clazz)[parentPath] = PreloadedFolder(subfiles.size)
if (getPreloadMap(clazz).size > PRELOADED_MAP_MAXIMUM) cleanOldEntries()
getDeleteQueue(clazz).add(parentPath)

somethingAddedToPreload = somethingAddedToPreload || subfiles.isNotEmpty()
subfiles.forEach {
send(toPreloadable(parentPath, ProcessUnit(parentPath + DIVIDER + it, unit.fetcherFunction), PRIORITY_FUTURE))
}
}
}

if(somethingAddedToPreload) {
processor.work()
}
}
close()
})
}

/**
Expand All @@ -105,18 +106,18 @@ internal class Loader<D: DataContainer>(private val clazz: Class<D>) {
val file = KFile(unit.path)
val fileList = file.list() ?: arrayOf()

for (path in fileList) {
addToProcess(file.path, ProcessUnit(file.absolutePath + DIVIDER + path, unit.fetcherFunction), PRIORITY_NOW)
val preloadableFiles = produce {
for (path in fileList) {
send(toPreloadable(file.path, ProcessUnit(file.absolutePath + DIVIDER + path, unit.fetcherFunction), PRIORITY_NOW))
}
}

getPreloadMapMutex(clazz).withLock {
getPreloadMap(clazz)[file.path] = PreloadedFolder(fileList.size)
getDeleteQueue(clazz).add(file.path)
}

if (fileList.isNotEmpty()) {
processor.work()
}
processor.workHighPriority(preloadableFiles)
}
}

Expand All @@ -134,7 +135,7 @@ internal class Loader<D: DataContainer>(private val clazz: Class<D>) {
/**
* Clear everything, all data loaded will be discarded.
*/
internal suspend fun clear() {
internal fun clear() {
processor.clear()
getPreloadMap(clazz).clear()
getDeleteQueue(clazz).clear()
Expand All @@ -143,11 +144,11 @@ internal class Loader<D: DataContainer>(private val clazz: Class<D>) {
/**
* Add file (represented by [unit]) to the [preloadPriorityQueue] to be preloaded by [loadFolder].
*/
private suspend fun addToProcess(path: String, unit: ProcessUnit<D>, priority: Int) {
private fun toPreloadable(path: String, unit: ProcessUnit<D>, priority: Int): PreloadableUnit<D> {
val start = if(priority == PRIORITY_NOW) CoroutineStart.DEFAULT else CoroutineStart.LAZY

val f = GlobalScope.async(start = start) { ProcessedUnit(path, unit.fetcherFunction(unit.path)) }
processor.add(PreloadableUnit(f, priority, unit.path.hashCode()))
return PreloadableUnit(f, priority)
}

/**
Expand Down
72 changes: 41 additions & 31 deletions lib/src/main/java/com/amaze/filepreloaderlibrary/Processor.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.amaze.filepreloaderlibrary

import com.amaze.filepreloaderlibrary.PreloadedManager.getDeleteQueue
import com.amaze.filepreloaderlibrary.PreloadedManager.getPreloadMap
import com.amaze.filepreloaderlibrary.PreloadedManager.getPreloadMapMutex
import com.amaze.filepreloaderlibrary.datastructures.*
import com.amaze.filepreloaderlibrary.utils.*
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel

import java.util.*
import java.util.concurrent.atomic.AtomicInteger

/**
* Basically means call `[ProcessUnit].fetcherFunction` on each of `[ProcessUnit].path`'s files.
* This is done asynchly.
*
* @see Processor.work
* @see Processor.workHighPriority
*/
internal data class ProcessUnit<out D: DataContainer>(val path: String, val fetcherFunction: FetcherFunction<D>)

Expand All @@ -34,28 +34,33 @@ internal class Processor<D: DataContainer>(private val clazz: Class<D>) {
}
}

/**
* Thread safe.
* All the callable executions to load all the folders.
*
* 'Load a folder' means that the function `[unit].second` will be called
* on each file (represented by its path) inside the folder.
*/
private val preloadPriorityQueue: UniquePriorityBlockingQueue<PreloadableUnit<D>> = UniquePriorityBlockingQueue()

private val isWorking = AtomicBoolean(false)
private val ranCoroutines = Collections.synchronizedSet(hashSetOf<Job>())
private val workingWithHighPriority = AtomicInteger(0)

/**
* Calls each function in [preloadPriorityQueue] (removing it).
* Then adds the result [(path, data)] to `[getPreloadMap].get(path)`.
*/
internal fun work() {
if(isWorking.get()) return
isWorking.set(true)
internal fun workHighPriority(producer: ReceiveChannel<PreloadableUnit<D>?>) {
work(producer, {
workingWithHighPriority.incrementAndGet()
}) {
workingWithHighPriority.decrementAndGet()
}
}

GlobalScope.launch {
while (preloadPriorityQueue.isNotEmpty()) {
val elem = preloadPriorityQueue.poll() ?: throw IllegalStateException("Polled element cannot be null!")
internal fun workLowPriority(producer: ReceiveChannel<PreloadableUnit<D>?>) {
work(producer, {
while (workingWithHighPriority.get() != 0) yield()
})
}

private fun work(producer: ReceiveChannel<PreloadableUnit<D>?>, onStart: suspend () -> Unit, onEnd: suspend () -> Unit = {}) {
val job = GlobalScope.launch {
onStart()

for (elem in producer) {
elem ?: throw IllegalStateException("Polled element cannot be null!")
val (path, data) = elem.future.await()

DebugLog.log("FilePreloader.Processor", "[P${elem.priority}] Loading from $path: $data")
Expand All @@ -65,19 +70,24 @@ internal class Processor<D: DataContainer>(private val clazz: Class<D>) {
list.add(data)
}

isWorking.set(false)
onEnd()
}
}

internal suspend fun add(element: PreloadableUnit<D>) {
preloadPriorityQueue.add(element)
ranCoroutines.add(job)

job.invokeOnCompletion {
ranCoroutines.remove(job)
}
}

/**
* Clear everything, all data loaded will be discarded.
*/
internal suspend fun clear() {
preloadPriorityQueue.clear()
internal fun clear() {
GlobalScope.launch {
ranCoroutines.forEach {
it.cancelAndJoin()
}

ranCoroutines.clear()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ class SpecializedPreloader<out D: DataContainer>(private val clazz: Class<D>,
* It's usage is not recommended as the [Processor] already has a more efficient cleaning
* algorithm (see [Processor.deletionQueue]).
*/
internal suspend fun clear() = loader.clear()
internal fun clear() = loader.clear()
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,4 @@ package com.amaze.filepreloaderlibrary.datastructures
import com.amaze.filepreloaderlibrary.ProcessedUnit
import kotlinx.coroutines.Deferred

internal data class PreloadableUnit<D: DataContainer>(val future: Deferred<ProcessedUnit<D>>, val priority: Int, val hash: Int): Comparable<PreloadableUnit<D>> {
override fun compareTo(other: PreloadableUnit<D>) = priority.compareTo(other.priority)
override fun hashCode() = hash
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as PreloadableUnit<*>

if (hash != other.hash) return false

return true
}
}
internal data class PreloadableUnit<D: DataContainer>(val future: Deferred<ProcessedUnit<D>>, val priority: Int)

This file was deleted.

0 comments on commit 37a9cec

Please sign in to comment.