Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Remote/LocalChange to have a single ChangeInfo #112

Merged
merged 5 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class EditorViewModel(private val client: Client) : ViewModel(), YorkieEditText.
private val _content = MutableSharedFlow<String>()
val content = _content.asSharedFlow()

private val _textChangeInfos = MutableSharedFlow<Pair<ActorID, OperationInfo.TextOpInfo>>()
val textChangeInfos = _textChangeInfos.asSharedFlow()
private val _textOpInfos = MutableSharedFlow<Pair<ActorID, OperationInfo.TextOpInfo>>()
val textOpInfos = _textOpInfos.asSharedFlow()

val removedPeers = client.events.filterIsInstance<Event.PeersChanged>()
.map { it.result }
Expand Down Expand Up @@ -59,19 +59,18 @@ class EditorViewModel(private val client: Client) : ViewModel(), YorkieEditText.
if (event is Document.Event.Snapshot) {
syncText()
} else if (event is Document.Event.RemoteChange) {
emitTextChanges(event.changeInfos)
emitTextOpInfos(event.changeInfo)
}
}
}
}

private suspend fun emitTextChanges(changeInfos: List<Document.Event.ChangeInfo>) {
val clientID = client.requireClientId()
changeInfos.filterNot { it.actorID == clientID }
.flatMap { (_, ops, actor) ->
ops.filterIsInstance<OperationInfo.TextOpInfo>().map { op -> actor to op }
}.forEach {
_textChangeInfos.emit(it)
private suspend fun emitTextOpInfos(changeInfo: Document.Event.ChangeInfo) {
if (changeInfo.actorID == client.requireClientId()) return

changeInfo.operations.filterIsInstance<OperationInfo.TextOpInfo>()
.forEach { opInfo ->
_textOpInfos.emit(changeInfo.actorID to opInfo)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class MainActivity : AppCompatActivity() {
}

launch {
viewModel.textChangeInfos.collect { (actor, opInfo) ->
viewModel.textOpInfos.collect { (actor, opInfo) ->
when (opInfo) {
is OperationInfo.EditOpInfo -> opInfo.handleContentChange()
is OperationInfo.SelectOpInfo -> opInfo.handleSelectChange(actor)
Expand Down
16 changes: 8 additions & 8 deletions yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ class ClientTest {

val localSetEvent = assertIs<LocalChange>(document1Events.first())
val localSetOperation = assertIs<OperationInfo.SetOpInfo>(
localSetEvent.changeInfos.first().operations.first(),
localSetEvent.changeInfo.operations.first(),
)
assertEquals("k1", localSetOperation.key)
assertEquals("$", localSetEvent.changeInfos.first().operations.first().path)
assertEquals("$", localSetEvent.changeInfo.operations.first().path)
document1Events.clear()

val remoteSetEvent = assertIs<RemoteChange>(document2Events.first())
val remoteSetOperation = assertIs<OperationInfo.SetOpInfo>(
remoteSetEvent.changeInfos.first().operations.first(),
remoteSetEvent.changeInfo.operations.first(),
)
assertEquals("k1", remoteSetOperation.key)
document2Events.clear()
Expand Down Expand Up @@ -145,13 +145,13 @@ class ClientTest {

val remoteRemoveEvent = assertIs<RemoteChange>(document1Events.first())
val remoteRemoveOperation = assertIs<OperationInfo.RemoveOpInfo>(
remoteRemoveEvent.changeInfos.first().operations.first(),
remoteRemoveEvent.changeInfo.operations.first(),
)
assertEquals(localSetOperation.executedAt, remoteRemoveOperation.executedAt)

val localRemoveEvent = assertIs<LocalChange>(document2Events.first())
val localRemoveOperation = assertIs<OperationInfo.RemoveOpInfo>(
localRemoveEvent.changeInfos.first().operations.first(),
localRemoveEvent.changeInfo.operations.first(),
)
assertEquals(remoteSetOperation.executedAt, localRemoveOperation.executedAt)

Expand Down Expand Up @@ -365,7 +365,7 @@ class ClientTest {
},
launch(start = CoroutineStart.UNDISPATCHED) {
document3.events.filterIsInstance<RemoteChange>().collect { event ->
document3Ops.addAll(event.changeInfos.flatMap { it.operations })
document3Ops.addAll(event.changeInfo.operations)
}
},
)
Expand All @@ -392,8 +392,8 @@ class ClientTest {
// 03. c1 and c2 sync with push-only mode. So, the changes of c1 and c2
// are not reflected to each other.
// But, c3 can get the changes of c1 and c2, because c3 sync with pull-pull mode.
client1.pauseRemoteChange(document1)
client2.pauseRemoteChange(document2)
client1.pauseRemoteChanges(document1)
client2.pauseRemoteChanges(document2)
document1.updateAsync {
it["c1"] = 1
}.await()
Expand Down
19 changes: 8 additions & 11 deletions yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import dev.yorkie.assertJsonContentEquals
import dev.yorkie.document.Document
import dev.yorkie.document.Document.DocumentStatus
import dev.yorkie.document.Document.Event
import dev.yorkie.document.Document.Event.ChangeInfo
import dev.yorkie.document.crdt.TextWithAttributes
import dev.yorkie.document.json.JsonArray
import dev.yorkie.document.json.JsonCounter
Expand Down Expand Up @@ -232,13 +231,13 @@ class DocumentTest {
launch(start = CoroutineStart.UNDISPATCHED) {
document1.events.filterIsInstance<Event.RemoteChange>()
.collect {
document1Ops.addAll(it.changeInfos.flatMap(ChangeInfo::operations))
document1Ops.addAll(it.changeInfo.operations)
}
},
launch(start = CoroutineStart.UNDISPATCHED) {
document2.events.filterIsInstance<Event.RemoteChange>()
.collect {
document2Ops.addAll(it.changeInfos.flatMap(ChangeInfo::operations))
document2Ops.addAll(it.changeInfo.operations)
}
},
)
Expand Down Expand Up @@ -359,21 +358,19 @@ class DocumentTest {
"events" to launch(start = CoroutineStart.UNDISPATCHED) {
document1.events.filterIsInstance<Event.RemoteChange>()
.collect {
document1Ops.addAll(it.changeInfos.flatMap(ChangeInfo::operations))
document1Ops.addAll(it.changeInfo.operations)
}
},
"todos" to launch(start = CoroutineStart.UNDISPATCHED) {
document1.events("$.todos").filterIsInstance<Event.RemoteChange>()
.collect {
document1TodosOps.addAll(it.changeInfos.flatMap(ChangeInfo::operations))
document1TodosOps.addAll(it.changeInfo.operations)
}
},
"counter" to launch(start = CoroutineStart.UNDISPATCHED) {
document1.events("$.counter").filterIsInstance<Event.RemoteChange>()
.collect {
document1CounterOps.addAll(
it.changeInfos.flatMap(ChangeInfo::operations),
)
document1CounterOps.addAll(it.changeInfo.operations)
}
},
)
Expand Down Expand Up @@ -488,19 +485,19 @@ class DocumentTest {
"events" to launch(start = CoroutineStart.UNDISPATCHED) {
document1.events.filterIsInstance<Event.RemoteChange>()
.collect {
document1Ops.addAll(it.changeInfos.flatMap(ChangeInfo::operations))
document1Ops.addAll(it.changeInfo.operations)
}
},
"todos" to launch(start = CoroutineStart.UNDISPATCHED) {
document1.events("$.todos.0").filterIsInstance<Event.RemoteChange>()
.collect {
document1TodosOps.addAll(it.changeInfos.flatMap(ChangeInfo::operations))
document1TodosOps.addAll(it.changeInfo.operations)
}
},
"obj" to launch(start = CoroutineStart.UNDISPATCHED) {
document1.events("$.obj.c1").filterIsInstance<Event.RemoteChange>()
.collect {
document1ObjOps.addAll(it.changeInfos.flatMap(ChangeInfo::operations))
document1ObjOps.addAll(it.changeInfo.operations)
}
},
)
Expand Down
2 changes: 1 addition & 1 deletion yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ public class Client @VisibleForTesting internal constructor(
* Pauses the synchronization of remote changes,
* allowing only local changes to be applied.
*/
public fun pauseRemoteChange(document: Document) {
public fun pauseRemoteChanges(document: Document) {
changeSyncMode(document, SyncMode.PushOnly)
}

Expand Down
32 changes: 14 additions & 18 deletions yorkie/src/main/kotlin/dev/yorkie/document/Document.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public class Document(public val key: Key) {
val operationInfos = change.execute(root)
localChanges += change
changeID = change.id
val changeInfos = listOf(change.toChangeInfo(operationInfos))
eventStream.emit(Event.LocalChange(changeInfos))
val changeInfo = change.toChangeInfo(operationInfos)
eventStream.emit(Event.LocalChange(changeInfo))
true
}
}
Expand All @@ -112,33 +112,27 @@ public class Document(public val key: Key) {
when (event) {
is Event.Snapshot -> event
is Event.RemoteChange -> {
event.changeInfos.filterTargetChangeInfos(targetPath)
event.changeInfo.operations.filterTargetOpInfos(targetPath)
.takeIf { it.isNotEmpty() }
?.let {
Event.RemoteChange(it)
Event.RemoteChange(event.changeInfo.copy(operations = it))
}
}

is Event.LocalChange -> {
event.changeInfos.filterTargetChangeInfos(targetPath)
event.changeInfo.operations.filterTargetOpInfos(targetPath)
.takeIf { it.isNotEmpty() }
?.let {
Event.LocalChange(it)
Event.LocalChange(event.changeInfo.copy(operations = it))
}
}
}
}
}

private fun List<Event.ChangeInfo>.filterTargetChangeInfos(targetPath: String) =
mapNotNull { (message, operations, actor) ->
val targetOps = operations.filter { isSameElementOrChildOf(it.path, targetPath) }
if (targetOps.isEmpty()) {
null
} else {
Event.ChangeInfo(message, targetOps, actor)
}
}
private fun List<OperationInfo>.filterTargetOpInfos(targetPath: String): List<OperationInfo> {
return filter { isSameElementOrChildOf(it.path, targetPath) }
}

private fun isSameElementOrChildOf(element: String, parent: String): Boolean {
return if (parent == element) {
Expand Down Expand Up @@ -220,7 +214,9 @@ public class Document(public val key: Key) {
if (changesInfo.isEmpty()) {
return
}
eventStream.emit(Event.RemoteChange(changesInfo))
changesInfo.forEach { changeInfo ->
eventStream.emit(Event.RemoteChange(changeInfo))
}
}

private suspend fun ensureClone(): CrdtRoot = withContext(dispatcher) {
Expand Down Expand Up @@ -292,14 +288,14 @@ public class Document(public val key: Key) {
* An event that occurs when the document is changed by local changes.
*/
public class LocalChange internal constructor(
public val changeInfos: List<ChangeInfo>,
public val changeInfo: ChangeInfo,
) : Event

/**
* An event that occurs when the document is changed by remote changes.
*/
public class RemoteChange internal constructor(
public val changeInfos: List<ChangeInfo>,
public val changeInfo: ChangeInfo,
) : Event

/**
Expand Down
4 changes: 2 additions & 2 deletions yorkie/src/test/kotlin/dev/yorkie/document/DocumentTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class DocumentTest {
assertEquals(1, events.size)
var event = events.first()
assertIs<Document.Event.LocalChange>(event)
var operations = event.changeInfos.first().operations
var operations = event.changeInfo.operations
assertEquals(2, operations.size)
assertTrue(operations.all { it is SetOpInfo })

Expand All @@ -159,7 +159,7 @@ class DocumentTest {
assertEquals(2, events.size)
event = events.last()
assertIs<Document.Event.LocalChange>(event)
operations = event.changeInfos.first().operations
operations = event.changeInfo.operations
assertEquals(2, operations.size)
assertTrue(operations.all { it is RemoveOpInfo })

Expand Down