Skip to content
This repository has been archived by the owner on Jan 15, 2025. It is now read-only.

Commit

Permalink
added msg_rcpt.cause, fixed bug: only create new handshake when recei…
Browse files Browse the repository at this point in the history
…pt is rejection
  • Loading branch information
julius-b committed Dec 12, 2022
1 parent 562084c commit 5d5785e
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 48 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Opia

## End-to-End encrypted chat
- Using [Nikea](https://github.com/julius-b/nikea-kt) handshakes.

## TODO
- determine Retrofit host config using Build config (DEBUG/PROD)
- logging library
Expand Down
2 changes: 1 addition & 1 deletion android/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies {
implementation(project(":common"))
implementation(compose.material)
implementation("androidx.activity:activity-compose:1.6.1")
coreLibraryDesugaring("com.android.tools:desugar_jdk_libs:1.1.5")
coreLibraryDesugaring("com.android.tools:desugar_jdk_libs:1.2.2")

implementation("com.squareup.sqldelight:runtime:1.5.4")
implementation("com.squareup.sqldelight:coroutines-extensions:1.5.4")
Expand Down
2 changes: 1 addition & 1 deletion common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,6 @@ android {
targetCompatibility = JavaVersion.VERSION_11
}
dependencies {
coreLibraryDesugaring("com.android.tools:desugar_jdk_libs:1.1.5")
coreLibraryDesugaring("com.android.tools:desugar_jdk_libs:1.2.2")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ data class ApiUpdatedReceipt(
val rcpt_ioid: UUID,
val dup: Long,
val hs_id: UUID?,
val cause: String?,
val recv_at: ZonedDateTime?,
val rjct_at: ZonedDateTime?,
val read_at: ZonedDateTime?,
Expand All @@ -71,7 +72,7 @@ data class ApiUpdatedReceipt(
val created_at: ZonedDateTime
) {
fun toMsgRcpt() = Msg_rcpt(
msg_id, rcpt_ioid, dup, hs_id, recv_at, rjct_at, read_at
msg_id, rcpt_ioid, dup, hs_id, cause, recv_at, rjct_at, read_at
)
}

Expand Down
90 changes: 46 additions & 44 deletions common/src/commonMain/kotlin/app/opia/common/sync/ChatSync.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ class ChatSync(
// ioid to (hs)
val newHandshakes = mutableMapOf<UUID, MutableList<ActiveHandshake>>()

fun reject(hsId: UUID, packets: List<ApiMsgPacket>, reason: String? = null) {
fun reject(hsId: UUID, packets: List<ApiMsgPacket>, reason: String) {
db.msgQueries.transaction {
afterRollback {
println("[!] Sync > rx > rollback: $hsId")
}

for (p in packets) {
val rcpt = Msg_rcpt(
p.msg_id, p.rcpt_ioid, p.dup, hsId, null, ZonedDateTime.now(), null
p.msg_id, p.rcpt_ioid, p.dup, hsId, reason, null, ZonedDateTime.now(), null
)
db.msgQueries.upsertReceipt(rcpt)
db.msgQueries.upsertReceiptSyncStatus(
Expand All @@ -97,26 +97,29 @@ class ChatSync(

for ((hsId, packets) in perHandshake) {
val peerIO = packets[0].from_ioid
// doesn't happen because server inserts the receipt with read_at
if (peerIO == sess.ioid) println("[~] Sync > rx [$peerIO/$hsId] > msg from self")
val hs: ActiveHandshake
if (activeHandshakes[peerIO]?.meta?.id == hsId) {
hs = activeHandshakes[peerIO]!!
} else {
println("[*] Sync > rx [$peerIO:$hsId] > not currently active, querying...")
println("[*] Sync > rx [$peerIO/$hsId] > hs not currently active, querying...")
val hsRes = di.messagingService.getHandshake(hsId)
when (hsRes) {
is NetworkResponse.ApiSuccess -> {
val apiHandshake = hsRes.body.data
println("[+] Sync > rx [$peerIO:$hsId] > remote lookup successful, created_at: ${apiHandshake.created_at}")
println("[+] Sync > rx [$peerIO/$hsId] > remote lookup successful, created_at: ${apiHandshake.created_at}")
// TODO remove, server wouldn't return it...
if (apiHandshake.initiator_ioid == sess.ioid) {
println("[~] Sync > rx [$peerIO:$hsId] > hs is self-initiated, can't restore - rejecting...")
reject(hsId, packets)
println("[~] Sync > rx [$peerIO/$hsId] > hs is self-initiated, can't restore - rejecting...")
reject(hsId, packets, "rx_self_initiated_hs_lost")
break
}
val init = initHandshake(apiHandshake, isInitiator = false)
val err = init.getError()
if (err != null) {
println("[!] Sync > rx [$peerIO:$hsId] > failed to init hs: $err")
reject(hsId, packets, err.name)
println("[!] Sync > rx [$peerIO/$hsId] > failed to init hs: $err")
reject(hsId, packets, "rx_init/${err.name}")
break
}
hs = init.unwrap()
Expand All @@ -127,12 +130,12 @@ class ChatSync(
}
is NetworkResponse.ApiError -> {
// eg. bc self-initiated: {"code":"forbidden","errors":{"initiator_ioid":[{"code":"forbidden"}]}}
println("[~] Sync > rx [$peerIO:$hsId] > remote lookup failed - rejecting...")
reject(hsId, packets)
println("[~] Sync > rx [$peerIO/$hsId] > remote lookup failed - rejecting...")
reject(hsId, packets, "rx_remote_lookup/${hsRes.body.code}")
break
}
else -> {
println("[!] Sync > rx [$peerIO:$hsId] > api lookup failed, assuming temporary - ignoring all #${packets.size} packets")
println("[!] Sync > rx [$peerIO/$hsId] > api lookup failed, assuming temporary - ignoring all #${packets.size} packets")
break
}
}
Expand All @@ -142,19 +145,19 @@ class ChatSync(
val raw = Base64.getDecoder().decode(p.payload_enc).toUByteArray()
val n = hs.hs.rx.n.toLong()
if (n != p.seqno) {
println("[*] Sync > rx [$peerIO:$hsId@${p.dup}] > n: $n, seqno: ${p.seqno}")
println("[*] Sync > rx [$peerIO/$hsId@${p.dup}] > n: $n, seqno: ${p.seqno}")
// NOTE: if it fails it's (likely) the first one
reject(hsId, packets)
reject(hsId, packets, "rx_seqno/$n/${p.seqno}")
break
}
val payload = hs.hs.rx.decrypt(raw).toByteArray().toUtf8()
println("[+] Sync > rx [$peerIO:$hsId@${p.dup}] > payload: $payload")
println("[+] Sync > rx [$peerIO/$hsId@${p.dup}] > payload: '$payload'")
val rcpt = Msg_rcpt(
p.msg_id, p.rcpt_ioid, p.dup, hsId, ZonedDateTime.now(), null, null
p.msg_id, p.rcpt_ioid, p.dup, hsId, null, ZonedDateTime.now(), null, null
)
db.msgQueries.transaction {
afterRollback {
println("[!] Sync > rx [$peerIO:$hsId@${p.dup}] > rollback")
println("[!] Sync > rx [$peerIO/$hsId@${p.dup}] > rollback")
}
db.msgQueries.insert(msg)
db.msgQueries.insertPayload(Msg_payload(msg.id, payload))
Expand Down Expand Up @@ -187,7 +190,7 @@ class ChatSync(
for (newHs in newPeerHandshakes) {
val currentHs = activeHandshakes[peerIO] // every loop
if (currentHs == null || currentHs.meta.created_at.isBefore(newHs.meta.created_at)) {
println("[+] Sync > rx [$peerIO] > current handshake outdated, using new: ${newHs.meta.id}")
println("[+] Sync > rx [$peerIO] > current handshake (${currentHs?.meta?.id}) outdated, using new: ${newHs.meta.id}")
activeHandshakes[peerIO] = newHs
}
}
Expand Down Expand Up @@ -231,14 +234,14 @@ class ChatSync(
for ((dup, updsForDup) in upds.groupBy { it.dup }) {
val u = updsForDup.maxBy { it.created_at }
db.msgQueries.upsertReceipt(
Msg_rcpt(msgId, u.rcpt_ioid, dup, u.hs_id, u.recv_at, u.rjct_at, u.read_at)
Msg_rcpt(msgId, u.rcpt_ioid, dup, u.hs_id, u.cause, u.recv_at, u.rjct_at, u.read_at)
)
// delete updates in order (ensure oldest is deleted before newest is deleted)

for (upd in updsForDup.sortedBy { it.created_at }) {
// uploadRejected only sees the latest rejections, which should suffice but somehow doesn't
if (activeHandshakes[upd.rcpt_ioid]?.meta?.id == upd.hs_id) {
println("[~] Sync > rcpts > deleting active handshake: ${upd.hs_id}")
if (upd.rjct_at != null && activeHandshakes[upd.rcpt_ioid]?.meta?.id == upd.hs_id) {
println("[~] Sync > rcpts > got rjct, deleting active handshake: ${upd.hs_id}")
activeHandshakes.remove(upd.rcpt_ioid)
}
val delRes = di.messagingService.deleteReceiptUpdate(
Expand Down Expand Up @@ -334,7 +337,7 @@ class ChatSync(
println("[*] Sync > tx > unsynced: #${unsynced.size}")

// need to loop per msg first for initial receipt creation
unsynced.forEach tx@{ msg ->
for (msg in unsynced) {
println("[*] Sync > tx [+${msg.id} -> ${msg.rcpt_id}] > preparing msg: '${msg.payload}'")
// TODO don't query IOs every time if rcpt is the same...
val rcptIOsRes = di.messagingService.listIOs(msg.rcpt_id)
Expand All @@ -347,88 +350,87 @@ class ChatSync(
println("[+] Sync > tx [+${msg.id} -> ${msg.rcpt_id}] > peer IOs (#${rcptIOs.size}): $rcptIOs")

val packets = mutableListWithCap<ApiMsgPacket>(rcptIOs.size)
// for ((_rcptIdx, rcptIO) in rcptIOs.withIndex()) {
rcptIOs.forEachIndexed rcptIO@{ _rcptIdx, rcptIO ->
val rcptIdx = _rcptIdx + 1
rcptIO@for ((rcptIdx, rcptIO) in rcptIOs.withIndex()) {
println("[*] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > active hs: ${activeHandshakes.get(rcptIO)?.meta?.id}")
if (!activeHandshakes.containsKey(rcptIO)) {
if (!keyed.contains(rcptIO)) {
println("[~] Sync > tx [${msg.rcpt_id}/$rcptIdx] > peer has no keys, skipping...")
return@rcptIO
println("[~] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > peer has no keys, skipping...")
continue@rcptIO
}
println("[*] Sync > tx [${msg.rcpt_id}/$rcptIdx] > no active handshake, initiating...")
println("[*] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > no active handshake, initiating...")
val hsRes = di.messagingService.createHandshake(CreateHandshakeParams(rcptIO))
when (hsRes) {
is NetworkResponse.NetworkError -> {
println("[~] Sync > tx [${msg.rcpt_id}/$rcptIdx] > +hs > no network, aborting tx...")
println("[~] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > +hs > no network, aborting tx...")
return false
}
is NetworkResponse.ApiError -> {
val errors = hsRes.body.errors
if (errors.hasErr("ekex", Code.required)) {
println("[~] Sync > tx [${msg.rcpt_id}/$rcptIdx] > +hs > self out of ekex keys, aborting tx...")
println("[~] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > +hs > self out of ekex keys, aborting tx...")
return false
} else if (errors.hasErr("responder_ioid", Code.expired)) {
println("[~] Sync > tx [${msg.rcpt_id}/$rcptIdx] > +hs > peer is out of keys, skipping...")
return@rcptIO
println("[~] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > +hs > peer is out of keys, skipping...")
continue@rcptIO
} else {
println("[!] Sync > tx [${msg.rcpt_id}/$rcptIdx] > +hs > unexpected api err: $hsRes")
println("[!] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > +hs > unexpected api err: $hsRes")
return false
}
return@tx
}
is NetworkResponse.UnknownError -> {
println("[!] Sync > tx [${msg.rcpt_id}/$rcptIdx] > +hs > unexpected err: $hsRes")
println("[!] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > +hs > unexpected err: $hsRes")
return false
}
else -> {}
}
val hs = (hsRes as NetworkResponse.ApiSuccess).body.data
println("[+] Sync > tx [${msg.rcpt_id}/$rcptIdx] > +hs > registered: $hs")
println("[+] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > +hs > registered: $hs")

val init = initHandshake(hs, isInitiator = true)
val err = init.getError()
if (err != null) {
println("[!] Sync > rx [${msg.rcpt_id}/$rcptIdx] > +hs > err: $err")
return@rcptIO
println("[!] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > +hs > err: $err")
continue@rcptIO
}
activeHandshakes[rcptIO] = init.unwrap()
}
val hs = activeHandshakes[rcptIO]!!
println("[*] Sync > tx [${msg.rcpt_id}/$rcptIdx] > encrypting msg: '${msg.payload}'")
println("[*] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > encrypting msg: '${msg.payload}'")
val payload = msg.payload.encodeToByteArray().toUByteArray()
val enc = hs.hs.tx.encrypt(payload)
val enc64 = enc.toByteArray().toBase64()
println("[*] Sync > tx [${msg.rcpt_id}/$rcptIdx] > enc: $enc64")
println("[*] Sync > tx [-> ${msg.rcpt_id}/$rcptIdx] > enc: $enc64")
val p = ApiMsgPacket(
msg.id,
msg.from_id,
msg.rcpt_id,
rcptIO,
dup = 0L,
hs.meta.id,
seqno = hs.hs.tx.n.toLong() - 1L,
seqno = hs.hs.tx.n.toLong() - 1L, // already encrypted
enc64
)
packets.add(p)
}
println("[+] Sync > tx [+${msg.id}] > packets: #${packets.size}")
println("[+] Sync > tx [+${msg.id} -> ${msg.rcpt_id}] > packets: #${packets.size}")

val msgRes = di.messagingService.create(CreateMsgParams(msg.id, msg.rcpt_id, packets))
when (msgRes) {
is NetworkResponse.ApiSuccess -> {
val body = msgRes.body.data
println("[+] Sync > tx [+${msg.id}] > got receipts: #${body.rcpts.size} (rjct: #${body.rcpts.count { it.rjct_at != null }})")
println("[+] Sync > tx [+${msg.id} -> ${msg.rcpt_id}] > got receipts: #${body.rcpts.size} (rjct: #${body.rcpts.count { it.rjct_at != null }})")
db.msgQueries.transaction {
for (rcpt in body.rcpts) {
db.msgQueries.upsertReceipt(rcpt)
}
}
}
is NetworkResponse.ApiError, is NetworkResponse.UnknownError -> {
println("[!] Sync > tx [+${msg.id}] > unexpected err, aborting tx...")
println("[!] Sync > tx [+${msg.id} -> ${msg.rcpt_id}] > unexpected err, aborting tx...")
return false
}
is NetworkResponse.NetworkError -> {
println("[~] Sync > tx [+${msg.id}] > no network, aborting tx...")
println("[~] Sync > tx [+${msg.id} -> ${msg.rcpt_id}] > no network, aborting tx...")
return false
}
}
Expand Down
3 changes: 2 additions & 1 deletion common/src/commonMain/sqldelight/app/opia/common/db/msg.sq
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ CREATE TABLE IF NOT EXISTS msg_rcpt (
rcpt_ioid TEXT AS UUID NOT NULL,
dup INTEGER NOT NULL,
hs_id TEXT AS UUID, -- set if rjct
cause TEXT,
recv_at TEXT AS ZonedDateTime,
rjct_at TEXT AS ZonedDateTime CHECK (rjct_at IS NULL OR (recv_at IS NULL AND read_at IS NULL)),
read_at TEXT AS ZonedDateTime
Expand All @@ -53,7 +54,7 @@ getReceipt:
SELECT * FROM msg_rcpt WHERE msg_id = ? AND rcpt_ioid = ? AND dup = ?;

upsertReceipt:
INSERT OR REPLACE INTO msg_rcpt (msg_id, rcpt_ioid, dup, hs_id, recv_at, rjct_at, read_at) VALUES ?;
INSERT OR REPLACE INTO msg_rcpt (msg_id, rcpt_ioid, dup, hs_id, cause, recv_at, rjct_at, read_at) VALUES ?;

-- messages from self (any device? would never occur, but maybe if msgs were kept after ioid change) where no receipt _exists_
-- considers a rejected message as synced (rejections need to be handled differently than normal uploads)
Expand Down

0 comments on commit 5d5785e

Please sign in to comment.