Skip to content

Commit

Permalink
382 gpodder sync is incomplete (#388)
Browse files Browse the repository at this point in the history
* Improved last played with hashmap.

* Added column for SQLite.

* Added migration tool with podfetch migration episodes

* Added migration for postgres.

* Fixed clippy.
  • Loading branch information
SamTV12345 authored Nov 5, 2023
1 parent 7307e36 commit ec8a492
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE episodes DROP COLUMN cleaned_url;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Your SQL goes here
ALTER TABLE episodes ADD COLUMN cleaned_url TEXT NOT NULL DEFAULT '';
UPDATE episodes SET cleaned_url = episode;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE episodes DROP COLUMN cleaned_url;
3 changes: 3 additions & 0 deletions migrations/sqlite/2023-11-05-155720_episode_clean_url/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Your SQL goes here
ALTER TABLE episodes ADD COLUMN cleaned_url TEXT NOT NULL DEFAULT '';
UPDATE episodes SET cleaned_url = episode;
10 changes: 8 additions & 2 deletions src/command_line_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub fn start_command_line(mut args: Args) {
let podcast_rss_feed = args.next();

match podcast_rss_feed {
Some(feed)=>{
Some(feed) => {
let mut podcast_service = PodcastService::new();
let conn = &mut establish_connection();

Expand All @@ -53,7 +53,7 @@ pub fn start_command_line(mut args: Args) {
podcast_episode_service.insert_podcast_episodes(conn, podcast.clone()).unwrap();
podcast_service.schedule_episode_download(podcast, None, conn).unwrap();
}
None=>{
None => {
println!("Please provide a podcast rss feed url");
exit(1);
}
Expand Down Expand Up @@ -183,6 +183,12 @@ pub fn start_command_line(mut args: Args) {
}
}
}
"migration" => {
if args.next().unwrap().as_str() == "episodes" {
Episode::migrate_episode_urls(&mut establish_connection());
println!("Successfully migrated episode urls.")
}
}
"debug" => {
create_debug_message();
}
Expand Down
44 changes: 23 additions & 21 deletions src/controllers/watch_time_controller.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::ops::DerefMut;
use crate::models::misc_models::{PodcastWatchedEpisodeModelWithPodcastEpisode, PodcastWatchedPostModel};
use actix_web::web::Data;
Expand Down Expand Up @@ -42,35 +43,36 @@ Option<web::ReqData<User>>, mapping_service:Data<Mutex<MappingService>>) -> Resu

let designated_username = requester.unwrap().username.clone();
let last_watched = PodcastHistoryItem::get_last_watched_podcasts(&mut establish_connection(),
designated_username
designated_username
.clone(), mapping_service.lock().ignore_poison().clone()).unwrap();

let episodes = Episode::get_last_watched_episodes(designated_username,
conn.get().map_err(map_r2d2_error)?.deref_mut(),
)?;
conn.get().map_err(map_r2d2_error)?.deref_mut())?;
let mut last_watched_episodes:HashMap<String,
PodcastWatchedEpisodeModelWithPodcastEpisode> = HashMap::from_iter(last_watched.iter().map(|e| (e
.episode_id
.clone(), e.clone())));

let mut episodes_with_logs = last_watched.iter().map(|e|{
let episode = episodes.iter().find(|e1| e1.episode_id == e.episode_id);
match episode {
Some(episode) => {
if episode.watched_time>e.watched_time{
return episode
episodes.iter().for_each(|v|{
match last_watched_episodes.contains_key(&v.episode_id){
true => {
let e1 = last_watched_episodes.get(&v.episode_id).unwrap();
if e1.date<v.date{
last_watched_episodes.insert(v.episode_id.clone(), v.clone());
}
e
},
None => {
e
false => {
last_watched_episodes.insert(v.episode_id.clone(), v.clone());
}
}
}).collect::<Vec<&PodcastWatchedEpisodeModelWithPodcastEpisode>>();

episodes.iter().for_each(|x|{
if !episodes_with_logs.iter().any(|e| e.episode_id == x.episode_id){
episodes_with_logs.push(x);
}
}
});
episodes_with_logs.sort_by(|a,b| a.date.cmp(&b.date).reverse());
Ok(HttpResponse::Ok().json(episodes_with_logs))

let mut extracted_values = last_watched_episodes
.values()
.cloned()
.collect::<Vec<PodcastWatchedEpisodeModelWithPodcastEpisode>>();
extracted_values.sort_by(|a,b| a.date.cmp(&b.date).reverse());
Ok(HttpResponse::Ok().json(extracted_values))
}

#[utoipa::path(
Expand Down
1 change: 1 addition & 0 deletions src/dbconfig/schemas/postgresql/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ diesel::table! {
started -> Nullable<Int4>,
position -> Nullable<Int4>,
total -> Nullable<Int4>,
cleaned_url -> Text,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/dbconfig/schemas/sqlite/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ diesel::table! {
started -> Nullable<Integer>,
position -> Nullable<Integer>,
total -> Nullable<Integer>,
cleaned_url -> Text,
}
}

Expand Down
28 changes: 5 additions & 23 deletions src/gpodder/episodes/gpodder_episodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use actix_web::web::Data;
use crate::DbPool;
use crate::models::episode::{Episode, EpisodeAction, EpisodeDto};
use chrono::NaiveDateTime;
use crate::models::podcast_episode::PodcastEpisode;
use crate::models::podcast_history_item::PodcastHistoryItem;
use crate::models::session::Session;
use crate::utils::error::{CustomError, map_r2d2_error};
Expand All @@ -34,8 +33,7 @@ pub struct EpisodeSinceRequest{
#[get("/episodes/{username}.json")]
pub async fn get_episode_actions(username: web::Path<String>, pool: Data<DbPool>,
opt_flag: Option<web::ReqData<Session>>,
since: web::Query<EpisodeSinceRequest>) -> Result<HttpResponse,
CustomError> {
since: web::Query<EpisodeSinceRequest>) -> Result<HttpResponse, CustomError> {
match opt_flag {
Some(flag) => {
let username = username.clone();
Expand All @@ -60,6 +58,7 @@ pub async fn get_episode_actions(username: web::Path<String>, pool: Data<DbPool>
started: Option::from(watch_log.clone().0.watched_time),
position: Option::from(watch_log.clone().0.watched_time),
total: Option::from(watch_log.clone().1.total_time),
cleaned_url: "".to_string()
}
}).collect::<Vec<Episode>>();

Expand Down Expand Up @@ -87,27 +86,10 @@ pub async fn upload_episode_actions(username: web::Path<String>, podcast_episode
let mut inserted_episodes: Vec<Episode> = vec![];
podcast_episode.iter().for_each(|episode| {
let episode = Episode::convert_to_episode(episode, username.clone());
inserted_episodes.push(Episode::insert_episode(&episode.clone(), &mut conn
inserted_episodes.push(Episode::insert_episode(&episode.clone(), conn
.get()
.unwrap())
.expect("Unable to insert episode"));

if EpisodeAction::from_string(&episode.clone().action) == EpisodeAction::Play {
let mut episode_url = episode.clone().episode;
// Sometimes podcast provider like to check which browser access their podcast
let mut first_split = episode.episode.split('?');
let res = first_split.next();

if let Some(unwrapped_res) = res {
episode_url = unwrapped_res.parse().unwrap()
};

let podcast_episode = PodcastEpisode::query_podcast_episode_by_url(conn.get()
.map_err(map_r2d2_error).unwrap().deref_mut(),
&episode_url);
if podcast_episode.clone().unwrap().is_none() {
}
}
.map_err(map_r2d2_error).unwrap()
.deref_mut()).unwrap());
});
Ok(HttpResponse::Ok().json(EpisodeActionPostResponse {
update_urls: vec![],
Expand Down
61 changes: 46 additions & 15 deletions src/models/episode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use std::fmt;
use std::io::Error;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use diesel::{Queryable, QueryableByName, Insertable, RunQueryDsl, QueryDsl, BoolExpressionMethods, OptionalExtension};
use diesel::{Queryable, QueryableByName, Insertable, RunQueryDsl, QueryDsl, BoolExpressionMethods, OptionalExtension, TextExpressionMethods};

use crate::dbconfig::schema::episodes;
use utoipa::ToSchema;
use diesel::sql_types::{Integer, Text, Nullable, Timestamp};
use diesel::ExpressionMethods;
use reqwest::Url;

use crate::{DbConnection};
use crate::dbconfig::schema::episodes::dsl::episodes as episodes_dsl;

use crate::models::misc_models::PodcastWatchedEpisodeModelWithPodcastEpisode;
use crate::models::podcast_episode::PodcastEpisode;
Expand Down Expand Up @@ -41,6 +43,8 @@ pub struct Episode{
pub position:Option<i32>,
#[diesel(sql_type = Nullable<Integer>)]
pub total:Option<i32>,
#[diesel(sql_type = Text)]
pub cleaned_url: String
}


Expand All @@ -49,9 +53,9 @@ impl Episode{
use crate::dbconfig::schema::episodes::dsl::*;

let res = episodes.filter(timestamp.eq(self.clone().timestamp)
.and(device.eq(self.clone().device))
.and(podcast.eq(self.clone().podcast))
.and(episode.eq(self.clone().episode))
.and(device.eq(&self.clone().device))
.and(podcast.eq(&self.clone().podcast))
.and(episode.eq(&self.clone().episode))
.and(timestamp.eq(self.clone().timestamp)))
.first::<Episode>(conn)
.optional()
Expand All @@ -61,6 +65,8 @@ impl Episode{
return Ok(unwrapped_res)
}

let mut cleaned_url_parsed = Url::parse(&self.episode).unwrap();
cleaned_url_parsed.set_query(None);
diesel::insert_into(episodes)
.values((
username.eq(&self.username),
Expand All @@ -72,7 +78,8 @@ impl Episode{
action.eq(&self.action),
started.eq(&self.started),
position.eq(&self.position),
total.eq(&self.total)
total.eq(&self.total),
cleaned_url.eq(&cleaned_url_parsed.to_string()),
))
.get_result(conn)
}
Expand All @@ -87,11 +94,15 @@ impl Episode{
started: self.started,
position: self.position,
total: self.total,
device: self.clone().device,
device: self.clone().device.clone(),
}
}

pub fn convert_to_episode(episode_dto: &EpisodeDto, username: String)->Episode{
// Remove query parameters
let mut episode = Url::parse(&episode_dto.episode).unwrap();
episode.set_query(None);

Episode {
id: 0,
username,
Expand All @@ -104,6 +115,7 @@ impl Episode{
started: episode_dto.started,
position: episode_dto.position,
total: episode_dto.total,
cleaned_url: episode.to_string(),
}
}
pub async fn get_actions_by_username(username1: String, conn: &mut DbConnection, since_date: Option<NaiveDateTime>) ->Vec<Episode>{
Expand Down Expand Up @@ -162,7 +174,7 @@ impl Episode{
}
}

pub fn get_last_watched_episodes(username1: String, conn: &mut DbConnection)
pub fn get_last_watched_episodes(username_to_find: String, conn: &mut DbConnection)
->Result<Vec<PodcastWatchedEpisodeModelWithPodcastEpisode>,
CustomError>{
use crate::dbconfig::schema::episodes::dsl::*;
Expand All @@ -171,24 +183,29 @@ impl Episode{
use diesel::JoinOnDsl;

let query = podcast_episodes
.inner_join(episodes.on(podcast.eq(url)))
.inner_join(podcast_table::table.on(podcast_table::rssfeed.eq(podcast)))
.filter(username.eq(username1))
.inner_join(episodes.on(url.like(cleaned_url.concat("%"))))
.inner_join(podcast_table::table.on(podcast_table::id.eq(podcast_id)))
.filter(username.eq(username_to_find.clone()))
.load::<(PodcastEpisode,Episode, Podcast)>(conn)
.map_err(map_db_error)?;

let _query_1 = &podcast_episodes
.inner_join(episodes.on(url.like(cleaned_url.concat("%"))))
.inner_join(podcast_table::table.on(podcast_table::id.eq(podcast_id)))
.filter(username.eq(username_to_find));

let mapped_watched_episodes = query.iter().map(|e|{
PodcastWatchedEpisodeModelWithPodcastEpisode{
id: e.clone().1.id,
podcast_id: e.clone().2.id,
episode_id: e.clone().0.episode_id,
url: e.clone().0.url,
name: e.clone().0.name,
image_url: e.clone().0.image_url,
episode_id: e.0.episode_id.clone(),
url: e.0.url.clone(),
name: e.0.name.clone(),
image_url: e.0.image_url.clone(),
watched_time: e.clone().1.position.unwrap(),
date: e.clone().1.timestamp,
total_time: e.clone().0.total_time,
podcast_episode: e.clone().0,
podcast_episode: e.0.clone(),
podcast: e.2.clone(),
}
}).collect();
Expand All @@ -202,6 +219,20 @@ impl Episode{
.execute(conn).expect("");
Ok(())
}

pub fn migrate_episode_urls(conn: &mut DbConnection){
let episodes_loaded = episodes_dsl
.load::<Episode>(conn)
.expect("");
episodes_loaded.iter().for_each(|e|{
let mut cleaned_url_parsed = Url::parse(&e.episode).unwrap();
cleaned_url_parsed.set_query(None);
diesel::update(
episodes_dsl.filter(episodes::id.eq(e.id)))
.set(episodes::cleaned_url.eq(cleaned_url_parsed.to_string()))
.execute(conn).expect("");
});
}
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down

0 comments on commit ec8a492

Please sign in to comment.