Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into preview-videos-files
Browse files Browse the repository at this point in the history
  • Loading branch information
lgmarchi committed Jan 22, 2024
2 parents 68e5985 + 7853af5 commit 3807ae9
Show file tree
Hide file tree
Showing 13 changed files with 539 additions and 355 deletions.
2 changes: 1 addition & 1 deletion common/src/upload_file_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub enum UploadFileAction<T> {
Cancelling,
UploadFiles(Vec<PathBuf>),
Uploading((String, String, String)),
Finishing,
Finishing(PathBuf, bool),
Finished(T),
Error,
}
Expand Down
357 changes: 210 additions & 147 deletions common/src/warp_runner/manager/commands/constellation_commands.rs

Large diffs are not rendered by default.

94 changes: 17 additions & 77 deletions common/src/warp_runner/manager/commands/raygun_commands.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use chrono::{DateTime, Utc};
use derive_more::Display;
use futures::{channel::oneshot, StreamExt};
use futures::channel::oneshot;
use std::{
collections::{HashMap, HashSet},
ops::Range,
Expand All @@ -13,22 +13,21 @@ use warp::{
error::Error,
logging::tracing::log,
raygun::{
self, AttachmentKind, ConversationType, GroupSettings, Location, PinState, ReactionState,
self, AttachmentEventStream, ConversationType, GroupSettings, Location, PinState,
ReactionState,
},
};

use crate::{
state::{chats, identity, pending_message::PendingMessage, Friends},
state::{chats, identity, Friends},
warp_runner::{
conv_stream,
ui_adapter::{
self, conversation_to_chat, dids_to_identity, fetch_messages2, fetch_messages_between,
fetch_messages_from_chat, fetch_pinned_messages_from_chat, get_uninitialized_identity,
MessageEvent,
},
Account, FetchMessagesConfig, FetchMessagesResponse, Messaging, WarpEvent,
Account, FetchMessagesConfig, FetchMessagesResponse, Messaging,
},
WARP_EVENT_CH,
};

#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -106,16 +105,14 @@ pub enum RayGunCmd {
conv_id: Uuid,
msg: Vec<String>,
attachments: Vec<Location>,
appended_msg_id: Option<Uuid>,
rsp: oneshot::Sender<Result<(), warp::error::Error>>,
rsp: oneshot::Sender<Result<Option<AttachmentEventStream>, warp::error::Error>>,
},
#[display(fmt = "SendMessageForSeveralChats")]
SendMessageForSeveralChats {
convs_id: Vec<Uuid>,
msg: Vec<String>,
attachments: Vec<Location>,
appended_msg_id: Option<Uuid>,
rsp: oneshot::Sender<Result<(), warp::error::Error>>,
rsp: oneshot::Sender<Result<Vec<(Uuid, AttachmentEventStream)>, warp::error::Error>>,
},
#[display(fmt = "EditMessage")]
EditMessage {
Expand Down Expand Up @@ -144,7 +141,7 @@ pub enum RayGunCmd {
reply_to: Uuid,
msg: Vec<String>,
attachments: Vec<Location>,
rsp: oneshot::Sender<Result<(), warp::error::Error>>,
rsp: oneshot::Sender<Result<Option<AttachmentEventStream>, warp::error::Error>>,
},
// removes all direct conversations involving the recipient
#[display(fmt = "RemoveDirectConvs")]
Expand Down Expand Up @@ -274,43 +271,17 @@ pub async fn handle_raygun_cmd(
conv_id,
msg,
attachments,
appended_msg_id: ui_id,
rsp,
} => {
let r = if attachments.is_empty() {
messaging.send(conv_id, msg).await
messaging.send(conv_id, msg).await.map(|_| None)
} else {
//TODO: Pass stream off to attachment events
match messaging
.attach(conv_id, None, attachments.clone(), msg.clone())
.await
{
Ok(mut stream) => loop {
let msg_clone = msg.clone();
//let attachment_clone = attachments.clone();
if let Some(kind) = stream.next().await {
match kind {
AttachmentKind::Pending(result) => {
break result;
}
AttachmentKind::AttachedProgress(progress) => {
if let Err(e) = WARP_EVENT_CH.tx.send(WarpEvent::Message(
MessageEvent::AttachmentProgress {
progress,
conversation_id: conv_id,
msg: PendingMessage::for_compare(
msg_clone,
&attachments,
ui_id,
),
},
)) {
log::error!("failed to send warp_event: {e}");
}
}
}
}
},
Ok(stream) => Result::Ok(Some(stream)),
Err(e) => Err(e),
}
};
Expand All @@ -321,53 +292,27 @@ pub async fn handle_raygun_cmd(
convs_id,
msg,
attachments,
appended_msg_id: ui_id,
rsp,
} => {
let mut streams = vec![];
for chat_id in convs_id {
let _ = if attachments.is_empty() {
messaging.send(chat_id, msg.clone()).await
if attachments.is_empty() {
let _ = messaging.send(chat_id, msg.clone()).await;
} else {
//TODO: Pass stream off to attachment events
match messaging
.attach(chat_id, None, attachments.clone(), msg.clone())
.await
{
Ok(mut stream) => loop {
let msg_clone = msg.clone();
//let attachment_clone = attachments.clone();
if let Some(kind) = stream.next().await {
match kind {
AttachmentKind::Pending(result) => {
break result;
}
AttachmentKind::AttachedProgress(progress) => {
if let Err(e) = WARP_EVENT_CH.tx.send(WarpEvent::Message(
MessageEvent::AttachmentProgress {
progress,
conversation_id: chat_id,
msg: PendingMessage::for_compare(
msg_clone,
&attachments,
ui_id,
),
},
)) {
log::error!("failed to send warp_event: {e}");
}
}
}
}
},
Ok(stream) => streams.push((chat_id, stream)),
Err(e) => {
log::error!("Raygun: Send files to several chats: {}", e);
Err(e)
}
}
};
}

let _ = rsp.send(Ok(()));
let _ = rsp.send(Ok(streams));
}
RayGunCmd::EditMessage {
conv_id,
Expand Down Expand Up @@ -406,18 +351,13 @@ pub async fn handle_raygun_cmd(
rsp,
} => {
let r = if attachments.is_empty() {
messaging.reply(conv_id, reply_to, msg).await
messaging.reply(conv_id, reply_to, msg).await.map(|_| None)
} else {
//TODO: Pass stream off to attachment events
match messaging
.attach(conv_id, Some(reply_to), attachments, msg)
.await
{
Ok(mut stream) => loop {
if let Some(AttachmentKind::Pending(result)) = stream.next().await {
break result;
}
},
Ok(stream) => Result::Ok(Some(stream)),
Err(e) => Err(e),
}
};
Expand Down
3 changes: 1 addition & 2 deletions ui/src/components/friends/friends_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,11 @@ pub fn ShareFriendsModal(cx: Scope<FriendProps>) -> Element {
let warp_cmd_tx = WARP_CMD_CH.tx.clone();
while let Some((id, uuid)) = rx.next().await {
let msg = vec![id.to_string()];
let (tx, rx) = oneshot::channel::<Result<(), warp::error::Error>>();
let (tx, rx) = oneshot::channel();
let cmd = RayGunCmd::SendMessageForSeveralChats {
convs_id: uuid,
msg,
attachments: Vec::new(),
appended_msg_id: None,
rsp: tx,
};
if let Err(e) = warp_cmd_tx.send(WarpCmd::RayGun(cmd)) {
Expand Down
37 changes: 24 additions & 13 deletions ui/src/layouts/chats/presentation/chatbar/coroutines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use futures::{channel::oneshot, StreamExt};
use uuid::Uuid;
use warp::raygun::{self, Location};

use crate::layouts::chats::data::{
self, ChatProps, MsgChInput, TypingInfo, DEFAULT_MESSAGES_TO_TAKE,
use crate::{
layouts::chats::data::{self, ChatProps, MsgChInput, TypingInfo, DEFAULT_MESSAGES_TO_TAKE},
utils::async_task_queue::chat_upload_stream_handler,
};

use super::TypingIndicator;
Expand All @@ -23,8 +24,9 @@ pub fn get_msg_ch(
cx: &Scoped<'_, ChatProps>,
state: &UseSharedState<State>,
) -> Coroutine<MsgChInput> {
let upload_streams = chat_upload_stream_handler(cx);
use_coroutine(cx, |mut rx: UnboundedReceiver<MsgChInput>| {
to_owned![state];
to_owned![state, upload_streams];
async move {
let warp_cmd_tx = WARP_CMD_CH.tx.clone();
while let Some(MsgChInput {
Expand All @@ -34,7 +36,7 @@ pub fn get_msg_ch(
replying_to,
}) = rx.next().await
{
let (tx, rx) = oneshot::channel::<Result<(), warp::error::Error>>();
let (tx, rx) = oneshot::channel();
let attachments = state
.read()
.get_active_chat()
Expand All @@ -45,15 +47,14 @@ pub fn get_msg_ch(
Some(reply_to) => RayGunCmd::Reply {
conv_id,
reply_to,
msg,
msg: msg.clone(),
attachments,
rsp: tx,
},
None => RayGunCmd::SendMessage {
conv_id,
msg,
msg: msg.clone(),
attachments,
appended_msg_id,
rsp: tx,
},
};
Expand Down Expand Up @@ -89,14 +90,24 @@ pub fn get_msg_ch(
}

let rsp = rx.await.expect("command canceled");
if let Err(e) = rsp {
log::error!("failed to send message: {}", e);
state.write().decrement_outgoing_messages(
match rsp {
Ok(Some(attachment)) => upload_streams.write().append((
conv_id,
msg_clone,
attachment_files,
msg,
attachments,
appended_msg_id,
);
attachment,
)),
Err(e) => {
log::error!("failed to send message: {}", e);
state.write().decrement_outgoing_messages(
conv_id,
msg_clone,
attachment_files,
appended_msg_id,
)
}
_ => {}
}
}
}
Expand Down
25 changes: 7 additions & 18 deletions ui/src/layouts/chats/presentation/messages/coroutines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
data::{self, ChatBehavior, ChatData, JsMsg, ScrollBtn, DEFAULT_MESSAGES_TO_TAKE},
scripts,
},
utils::download::get_download_path,
utils::{async_task_queue::download_stream_handler, download::get_download_path},
};

use super::{DownloadTracker, MessagesCommand};
Expand Down Expand Up @@ -442,8 +442,9 @@ pub fn handle_warp_commands(
state: &UseSharedState<State>,
pending_downloads: &UseSharedState<DownloadTracker>,
) -> Coroutine<MessagesCommand> {
let download_streams = download_stream_handler(cx);
let ch = use_coroutine(cx, |mut rx: UnboundedReceiver<MessagesCommand>| {
to_owned![state, pending_downloads];
to_owned![state, pending_downloads, download_streams];
async move {
let warp_cmd_tx = WARP_CMD_CH.tx.clone();
while let Some(cmd) = rx.next().await {
Expand Down Expand Up @@ -536,22 +537,10 @@ pub fn handle_warp_commands(

let res = rx.await.expect("command canceled");
match res {
Ok(mut stream) => {
while let Some(p) = stream.next().await {
log::debug!("{p:?}");
}
state.write().mutate(Action::AddToastNotification(
ToastNotification::init(
"".into(),
get_local_text_with_args(
"files.download-success",
vec![("file", file.name())],
),
None,
2,
),
));
on_finish.await
Ok(stream) => {
download_streams
.write()
.append((stream, file.name(), on_finish));
}
Err(e) => {
state.write().mutate(Action::AddToastNotification(
Expand Down
3 changes: 1 addition & 2 deletions ui/src/layouts/chats/presentation/quick_profile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,11 @@ pub fn QuickProfileContext<'a>(cx: Scope<'a, QuickProfileProps<'a>>) -> Element<
None => return,
};
let msg_vec = msg.clone();
let (tx, rx) = oneshot::channel::<Result<(), warp::error::Error>>();
let (tx, rx) = oneshot::channel();
let cmd = RayGunCmd::SendMessage {
conv_id: c,
msg,
attachments: Vec::new(),
appended_msg_id: uuid,
rsp: tx,
};
if let Err(e) = warp_cmd_tx.send(WarpCmd::RayGun(cmd)) {
Expand Down
2 changes: 1 addition & 1 deletion ui/src/layouts/log_in/copy_seed_words.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn SeedWords(cx: Scope, page: UseState<AuthPages>, words: Vec<String>) -> Elemen
},
span {
aria_label: "seed-word-value-{((idx * 2) + 1).to_string()}",
class: "val", vals.get(0).cloned().unwrap_or_default()
class: "val", vals.first().cloned().unwrap_or_default()
}
},
div {
Expand Down
Loading

0 comments on commit 3807ae9

Please sign in to comment.