Skip to content

Commit

Permalink
Merge pull request #723 from modelix/perf/bulk-sync-subtree-skipping
Browse files Browse the repository at this point in the history
MODELIX-889 Make skipping of subtrees in bulk-sync production-ready
  • Loading branch information
mhuster23 authored Jun 25, 2024
2 parents b4fd113 + 78d0bf5 commit a565505
Show file tree
Hide file tree
Showing 19 changed files with 1,795 additions and 617 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,17 @@ class ModelSyncGradlePlugin : Plugin<Project> {
) {
val baseDir = project.layout.buildDirectory.dir("model-sync").get().asFile.apply { mkdirs() }
val jsonDir = baseDir.resolve(syncDirection.name).apply { mkdir() }
val sourceTask = when (syncDirection.source) {
val source = syncDirection.source

val sourceTask = when (source) {
is LocalSource -> registerTasksForLocalSource(syncDirection, previousTask, jsonDir)
is ServerSource -> registerTasksForServerSource(syncDirection, project, previousTask, jsonDir)
is ServerSource -> {
if (source.baseRevision != null) {
previousTask
} else {
registerTasksForServerSource(syncDirection, project, previousTask, jsonDir)
}
}
else -> error("Unknown sync direction source")
}

Expand Down Expand Up @@ -196,9 +204,10 @@ class ModelSyncGradlePlugin : Plugin<Project> {
val localTarget = syncDirection.target as LocalTarget
val importName = "${syncDirection.name}ImportIntoMps"
val resolvedDependencies = mpsDependencies.resolvedConfiguration.files
val hasBaseRevision = (syncDirection.source as? ServerSource)?.baseRevision != null
val config = MPSRunnerConfig(
mainClassName = "org.modelix.mps.model.sync.bulk.MPSBulkSynchronizer",
mainMethodName = "importRepository",
mainMethodName = if (hasBaseRevision) "importRepositoryFromModelServer" else "importRepository",
classPathElements = resolvedDependencies.toList(),
mpsHome = localTarget.mpsHome,
workDir = jsonDir,
Expand All @@ -209,6 +218,10 @@ class ModelSyncGradlePlugin : Plugin<Project> {
"-Dmodelix.mps.model.sync.bulk.input.modules.prefixes=${syncDirection.includedModulePrefixes.joinToString(",")}",
"-Dmodelix.mps.model.sync.bulk.repo.path=${localTarget.repositoryDir?.absolutePath}",
"-Dmodelix.mps.model.sync.bulk.input.continueOnError=${syncDirection.continueOnError}",
"-Dmodelix.mps.model.sync.bulk.server.repository=${(syncDirection.source as ServerSource).repositoryId}".takeIf { hasBaseRevision },
"-Dmodelix.mps.model.sync.bulk.server.url=${(syncDirection.source as ServerSource).url}".takeIf { hasBaseRevision },
"-Dmodelix.mps.model.sync.bulk.server.version.hash=${(syncDirection.source as ServerSource).revision}".takeIf { hasBaseRevision },
"-Dmodelix.mps.model.sync.bulk.server.version.base.hash=${(syncDirection.source as ServerSource).baseRevision}".takeIf { hasBaseRevision },
"-Xmx${localTarget.mpsHeapSize}",
localTarget.mpsDebugPort?.let { "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=$it" },
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ data class ServerSource(
override var branchName: String? = null,
override var requestTimeoutSeconds: Int = DEFAULT_REQUEST_TIMEOUT_SECONDS,
var revision: String? = null,
var baseRevision: String? = null,
) : ServerEndpoint {
override fun getValidationErrors(): List<String> {
val errors = mutableListOf<String>()
Expand Down
6 changes: 6 additions & 0 deletions bulk-model-sync-lib/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ kotlin {
}
}

val jvmMain by getting {
dependencies {
implementation(libs.trove4j)
}
}

val commonTest by getting {
dependencies {
implementation(project(":model-api"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.modelix.model.sync.bulk

import org.modelix.model.api.INode

/**
* A node association is responsible for storing the mapping between a source node and the imported target node.
* Provides efficient lookup of the mapping from previous synchronization runs.
*/
interface INodeAssociation {
fun resolveTarget(sourceNode: INode): INode?
fun associate(sourceNode: INode, targetNode: INode)
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import kotlin.jvm.JvmName
*
* Properties, references, and child links are synchronized for this node and all of its (in-)direct children.
*
* Changes to the behaviour of this class should also reflected in [ModelImporter].
*
* @param root the root node to be updated
* @param continueOnError if true, ignore exceptions and continue.
* Enabling this might lead to inconsistent models.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
/*
* Copyright (c) 2024.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.modelix.model.sync.bulk

import mu.KotlinLogging
import org.modelix.model.api.ConceptReference
import org.modelix.model.api.INode
import org.modelix.model.api.INodeReference
import org.modelix.model.api.IReferenceLink
import org.modelix.model.api.IReplaceableNode
import org.modelix.model.api.IRole
import org.modelix.model.api.isChildRoleOrdered
import org.modelix.model.api.remove
import org.modelix.model.data.NodeData

/**
* Similar to [ModelImporter], but the input is two [INode] instances instead of [INode] and [NodeData].
*
* Changes to the behaviour of this class should also reflected in [ModelImporter].
*
* @param filter determines which nodes need synchronization.
* Nodes that do not match the filter are skipped and will remain unchanged.
* @param sourceRoot root of the tree containing the expected nodes
* @param targetRoot root of the tree that needs changes
* @param nodeAssociation mapping between source and target nodes, that is used for internal optimizations
*/
class ModelSynchronizer(
val filter: IFilter,
val sourceRoot: INode,
val targetRoot: INode,
val nodeAssociation: INodeAssociation,
) {
private val nodesToRemove: MutableSet<INode> = HashSet()
private val pendingReferences: MutableList<PendingReference> = ArrayList()
private val logger = KotlinLogging.logger {}

fun synchronize() {
logger.info { "Synchronizing nodes..." }
synchronizeNode(sourceRoot, targetRoot)
logger.info { "Synchronizing pending references..." }
pendingReferences.forEach { it.trySyncReference() }
logger.info { "Removing extra nodes..." }
nodesToRemove.filter { it.isValid }.forEach { it.remove() }
logger.info { "Synchronization finished." }
}

private fun synchronizeNode(sourceNode: INode, targetNode: INode) {
nodeAssociation.associate(sourceNode, targetNode)
if (filter.needsSynchronization(sourceNode)) {
logger.info { "Synchronizing changed node. sourceNode = $sourceNode" }
synchronizeProperties(sourceNode, targetNode)
synchronizeReferences(sourceNode, targetNode)

val sourceConcept = sourceNode.getConceptReference()
val targetConcept = targetNode.getConceptReference()

val conceptCorrectedTargetNode = if (sourceConcept != targetConcept && targetNode is IReplaceableNode) {
targetNode.replaceNode(sourceConcept?.getUID()?.let { ConceptReference(it) })
} else {
targetNode
}

syncChildren(sourceNode, conceptCorrectedTargetNode)
} else if (filter.needsDescentIntoSubtree(sourceNode)) {
for (sourceChild in sourceNode.allChildren) {
val targetChild = nodeAssociation.resolveTarget(sourceChild) ?: error("Expected target node was not found. sourceChild=$sourceChild")
synchronizeNode(sourceChild, targetChild)
}
} else {
logger.info { "Skipping subtree due to filter. root = $sourceNode" }
}
}

private fun synchronizeReferences(
sourceNode: INode,
targetNode: INode,
) {
iterateMergedRoles(sourceNode.getReferenceLinks(), targetNode.getReferenceLinks()) { role ->
val pendingReference = PendingReference(sourceNode, targetNode, role)

// If the reference target already exist we can synchronize it immediately and save memory between the
// two synchronization phases.
if (!pendingReference.trySyncReference()) {
pendingReferences += pendingReference
}
}
}

private fun synchronizeProperties(
sourceNode: INode,
targetNode: INode,
) {
iterateMergedRoles(sourceNode.getPropertyLinks(), targetNode.getPropertyLinks()) { role ->
val oldValue = targetNode.getPropertyValue(role.preferTarget())
val newValue = sourceNode.getPropertyValue(role.preferSource())
if (oldValue != newValue) {
targetNode.setPropertyValue(role.preferTarget(), newValue)
}
}
}

private fun syncChildren(sourceParent: INode, targetParent: INode) {
val allRoles = (sourceParent.allChildren.map { it.roleInParent } + targetParent.allChildren.map { it.roleInParent }).distinct()
for (role in allRoles) {
val sourceNodes = sourceParent.getChildren(role).toList()
val targetNodes = targetParent.getChildren(role).toList()

val allExpectedNodesDoNotExist by lazy {
sourceNodes.all { sourceNode ->
val originalId = sourceNode.originalId()
checkNotNull(originalId) { "Specified node '$sourceNode' has no ID." }
nodeAssociation.resolveTarget(sourceNode) == null
}
}

// optimization that uses the bulk operation .addNewChildren
if (targetNodes.isEmpty() && allExpectedNodesDoNotExist) {
targetParent.addNewChildren(role, -1, sourceNodes.map { it.getConceptReference() })
.zip(sourceNodes)
.forEach { (newChild, sourceChild) ->
val expectedId = sourceChild.originalId()
checkNotNull(expectedId) { "Specified node '$sourceChild' has no ID." }
nodeAssociation.associate(sourceChild, newChild)
synchronizeNode(sourceChild, newChild)
}
continue
}

// optimization for when there is no change in the child list
// size check first to avoid querying the original ID
if (sourceNodes.size == targetNodes.size && sourceNodes.map { it.originalId() } == targetNodes.map { it.originalId() }) {
sourceNodes.zip(targetNodes).forEach { synchronizeNode(it.first, it.second) }
continue
}

val isOrdered = targetParent.isChildRoleOrdered(role)

val newlyCreatedIds = mutableSetOf<String>()

sourceNodes.forEachIndexed { indexInImport, expected ->
val existingChildren = targetParent.getChildren(role).toList()
val expectedId = checkNotNull(expected.originalId()) { "Specified node '$expected' has no id" }
// newIndex is the index on which to import the expected child.
// It might be -1 if the child does not exist and should be added at the end.
val newIndex = if (isOrdered) {
indexInImport
} else {
// The `existingChildren` are only searched once for the expected element before changing.
// Therefore, indexing existing children will not be more efficient than iterating once.
// (For the moment, this is fine because as we expect unordered children to be the exception,
// Reusable indexing would be possible if we switch from
// a depth-first import to a breadth-first import.)
existingChildren
.indexOfFirst { existingChild -> existingChild.originalId() == expected.originalId() }
}
// existingChildren.getOrNull handles `-1` as needed by returning `null`.
val nodeAtIndex = existingChildren.getOrNull(newIndex)
val expectedConcept = expected.getConceptReference()
val childNode = if (nodeAtIndex?.originalId() != expectedId) {
val existingNode = nodeAssociation.resolveTarget(expected)
if (existingNode == null) {
val newChild = targetParent.addNewChild(role, newIndex, expectedConcept)
if (newChild.originalId() == null) {
newChild.setPropertyValue(NodeData.idPropertyKey, expectedId)
}
newChild.originalId()?.let { newlyCreatedIds.add(it) }
nodeAssociation.associate(expected, newChild)
newChild
} else {
// The existing child node is not only moved to a new index,
// it is potentially moved to a new parent and role.
targetParent.moveChild(role, newIndex, existingNode)
// If the old parent and old role synchronized before the move operation,
// the existing child node would have been marked as to be deleted.
// Now that it is used, it should not be deleted.
nodesToRemove.remove(existingNode)
existingNode
}
} else {
nodeAtIndex
}

synchronizeNode(expected, childNode)
}

val expectedNodesIds = sourceNodes.map { it.originalId() }.toSet()
// Do not use existingNodes, but call node.getChildren(role) because
// the recursive synchronization in the meantime already removed some nodes from node.getChildren(role).
nodesToRemove += targetParent.getChildren(role).filterNot { existingNode ->
val id = existingNode.originalId()
expectedNodesIds.contains(id) || newlyCreatedIds.contains(id)
}
}
}

inner class PendingReference(val sourceNode: INode, val targetNode: INode, val role: MergedRole<IReferenceLink>) {
fun trySyncReference(): Boolean {
val expectedRef = sourceNode.getReferenceTargetRef(role.preferSource())
if (expectedRef == null) {
targetNode.setReferenceTarget(role.preferTarget(), null as INodeReference?)
return true
}
val actualRef = targetNode.getReferenceTargetRef(role.preferTarget())

// Some reference targets may be excluded from the sync,
// in that case a serialized reference is stored and no lookup of the target is required.
if (actualRef?.serialize() == expectedRef.serialize()) {
// already up to date
return true
}

val referenceTargetInSource = sourceNode.getReferenceTarget(role.preferSource())
checkNotNull(referenceTargetInSource) { "Failed to resolve $expectedRef referenced by $sourceNode.${role.preferSource()}" }

val referenceTargetInTarget = nodeAssociation.resolveTarget(referenceTargetInSource)
?: return false // Target cannot be resolved right now but might become resolvable later.

if (referenceTargetInTarget.reference.serialize() != actualRef?.serialize()) {
targetNode.setReferenceTarget(role.preferTarget(), referenceTargetInTarget)
}
return true
}
}

private fun <T : IRole> iterateMergedRoles(
sourceRoles: Iterable<T>,
targetRoles: Iterable<T>,
body: (role: MergedRole<T>) -> Unit,
) = iterateMergedRoles(sourceRoles.asSequence(), targetRoles.asSequence(), body)

private fun <T : IRole> iterateMergedRoles(
sourceRoles: Sequence<T>,
targetRoles: Sequence<T>,
body: (role: MergedRole<T>) -> Unit,
) {
val sourceRolesMap = sourceRoles.filter { it.getUID() != NodeData.ID_PROPERTY_KEY }.associateBy { it.getUID() }
val targetRolesMap = targetRoles.associateBy { it.getUID() }
val roleUIDs = (sourceRolesMap.keys + targetRolesMap.keys).toSet()
for (roleUID in roleUIDs) {
val sourceRole = sourceRolesMap[roleUID]
val targetRole = targetRolesMap[roleUID]
body(MergedRole(sourceRole, targetRole))
}
}

class MergedRole<E : IRole>(
private val source: E?,
private val target: E?,
) {
fun preferTarget(): E = (target ?: source)!!
fun preferSource() = (source ?: target)!!
}

/**
* Determines, which nodes need synchronization and which can be skipped.
*
* It is valid for [needsDescentIntoSubtree] and [needsSynchronization] to return true for the same node.
*/
interface IFilter {
/**
* Checks if a subtree needs synchronization.
*
* @param subtreeRoot root of the subtree to be checked
* @return true iff the subtree must not be skipped
*/
fun needsDescentIntoSubtree(subtreeRoot: INode): Boolean

/**
* Checks if a single node needs synchronization.
*
* @param node node to be checked
* @return true iff the node must not be skipped
*/
fun needsSynchronization(node: INode): Boolean
}
}
Loading

0 comments on commit a565505

Please sign in to comment.