Skip to content

Commit

Permalink
Merge pull request #27 from pubky/feat/feed
Browse files Browse the repository at this point in the history
Feat/feed
  • Loading branch information
Nuhvi authored Aug 26, 2024
2 parents 620e2ad + a5dfff2 commit 9131cef
Show file tree
Hide file tree
Showing 11 changed files with 391 additions and 108 deletions.
27 changes: 27 additions & 0 deletions pubky-common/src/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Monotonic unix timestamp in microseconds
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::{
ops::{Add, Sub},
Expand Down Expand Up @@ -83,6 +84,12 @@ impl Timestamp {
}
}

impl Default for Timestamp {
fn default() -> Self {
Timestamp::now()
}
}

impl Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let bytes: [u8; 8] = self.into();
Expand Down Expand Up @@ -155,6 +162,26 @@ impl Sub<u64> for &Timestamp {
}
}

impl Serialize for Timestamp {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let bytes = self.to_bytes();
bytes.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for Timestamp {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let bytes: [u8; 8] = Deserialize::deserialize(deserializer)?;
Ok(Timestamp(u64::from_be_bytes(bytes)))
}
}

#[cfg(not(target_arch = "wasm32"))]
/// Return the number of microseconds since [SystemTime::UNIX_EPOCH]
fn system_time() -> u64 {
Expand Down
4 changes: 3 additions & 1 deletion pubky-homeserver/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub mod tables;

use tables::{Tables, TABLES_COUNT};

pub const MAX_LIST_LIMIT: u16 = 100;

#[derive(Debug, Clone)]
pub struct DB {
pub(crate) env: Env,
Expand Down Expand Up @@ -43,7 +45,7 @@ mod tests {
.join(Timestamp::now().to_string())
.join("pubky");

let mut db = DB::open(&storage).unwrap();
let db = DB::open(&storage).unwrap();

let keypair = Keypair::random();
let path = "/pub/foo.txt";
Expand Down
4 changes: 3 additions & 1 deletion pubky-homeserver/src/database/migrations/m0.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use heed::{Env, RwTxn};

use crate::database::tables::{blobs, entries, sessions, users};
use crate::database::tables::{blobs, entries, events, sessions, users};

pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;
Expand All @@ -11,5 +11,7 @@ pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {

let _: entries::EntriesTable = env.create_database(wtxn, Some(entries::ENTRIES_TABLE))?;

let _: events::EventsTable = env.create_database(wtxn, Some(events::EVENTS_TABLE))?;

Ok(())
}
9 changes: 8 additions & 1 deletion pubky-homeserver/src/database/tables.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod blobs;
pub mod entries;
pub mod events;
pub mod sessions;
pub mod users;

Expand All @@ -8,12 +9,15 @@ use heed::{Env, RwTxn};
use blobs::{BlobsTable, BLOBS_TABLE};
use entries::{EntriesTable, ENTRIES_TABLE};

pub const TABLES_COUNT: u32 = 4;
use self::events::{EventsTable, EVENTS_TABLE};

pub const TABLES_COUNT: u32 = 5;

#[derive(Debug, Clone)]
pub struct Tables {
pub blobs: BlobsTable,
pub entries: EntriesTable,
pub events: EventsTable,
}

impl Tables {
Expand All @@ -25,6 +29,9 @@ impl Tables {
entries: env
.open_database(wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created"),
events: env
.open_database(wtxn, Some(EVENTS_TABLE))?
.expect("Events table already created"),
})
}
}
2 changes: 1 addition & 1 deletion pubky-homeserver/src/database/tables/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub const BLOBS_TABLE: &str = "blobs";

impl DB {
pub fn get_blob(
&mut self,
&self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<bytes::Bytes>> {
Expand Down
41 changes: 33 additions & 8 deletions pubky-homeserver/src/database/tables/entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ use pubky_common::{
timestamp::Timestamp,
};

use crate::database::DB;
use crate::database::{DB, MAX_LIST_LIMIT};

use super::events::Event;

/// full_path(pubky/*path) => Entry.
pub type EntriesTable = Database<Str, Bytes>;

pub const ENTRIES_TABLE: &str = "entries";

const MAX_LIST_LIMIT: u16 = 100;

impl DB {
pub fn put_entry(
&mut self,
Expand Down Expand Up @@ -56,6 +56,19 @@ impl DB {
.entries
.put(&mut wtxn, &key, &entry.serialize())?;

if path.starts_with("pub/") {
let url = format!("pubky://{key}");
let event = Event::put(&url);
let value = event.serialize();

let key = entry.timestamp.to_string();

self.tables.events.put(&mut wtxn, &key, &value)?;

// TODO: delete older events.
// TODO: move to events.rs
}

wtxn.commit()?;

Ok(())
Expand All @@ -74,6 +87,21 @@ impl DB {

let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?;

// create DELETE event
if path.starts_with("pub/") {
let url = format!("pubky://{key}");

let event = Event::delete(&url);
let value = event.serialize();

let key = Timestamp::now().to_string();

self.tables.events.put(&mut wtxn, &key, &value)?;

// TODO: delete older events.
// TODO: move to events.rs
}

deleted_entry & deleted_blobs
} else {
false
Expand Down Expand Up @@ -198,7 +226,7 @@ pub struct Entry {
/// Encoding version
version: usize,
/// Modified at
timestamp: u64,
timestamp: Timestamp,
content_hash: [u8; 32],
content_length: usize,
content_type: String,
Expand All @@ -209,10 +237,7 @@ pub struct Entry {

impl Entry {
pub fn new() -> Self {
Self {
timestamp: Timestamp::now().into_inner(),
..Default::default()
}
Default::default()
}

// === Setters ===
Expand Down
58 changes: 58 additions & 0 deletions pubky-homeserver/src/database/tables/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! Server events (Put and Delete entries)
//!
//! Useful as a realtime sync with Indexers until
//! we implement more self-authenticated merkle data.
use heed::{
types::{Bytes, Str},
Database,
};
use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};

/// Event [Timestamp] base32 => Encoded event.
pub type EventsTable = Database<Str, Bytes>;

pub const EVENTS_TABLE: &str = "events";

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)]
pub enum Event {
Put(String),
Delete(String),
}

impl Event {
pub fn put(url: &str) -> Self {
Self::Put(url.to_string())
}

pub fn delete(url: &str) -> Self {
Self::Delete(url.to_string())
}

pub fn serialize(&self) -> Vec<u8> {
to_allocvec(self).expect("Session::serialize")
}

pub fn deserialize(bytes: &[u8]) -> core::result::Result<Self, postcard::Error> {
if bytes[0] > 1 {
panic!("Unknown Event version");
}

from_bytes(bytes)
}

pub fn url(&self) -> &str {
match self {
Event::Put(url) => url,
Event::Delete(url) => url,
}
}

pub fn operation(&self) -> &str {
match self {
Event::Put(_) => "PUT",
Event::Delete(_) => "DEL",
}
}
}
2 changes: 2 additions & 0 deletions pubky-homeserver/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::server::AppState;
use self::pkarr::pkarr_router;

mod auth;
mod feed;
mod pkarr;
mod public;
mod root;
Expand All @@ -25,6 +26,7 @@ fn base(state: AppState) -> Router {
.route("/:pubky/*path", put(public::put))
.route("/:pubky/*path", get(public::get))
.route("/:pubky/*path", delete(public::delete))
.route("/events/", get(feed::feed))
.layer(CookieManagerLayer::new())
// TODO: revisit if we enable streaming big payloads
// TODO: maybe add to a separate router (drive router?).
Expand Down
71 changes: 71 additions & 0 deletions pubky-homeserver/src/routes/feed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::collections::HashMap;

use axum::{
body::Body,
extract::{Query, State},
http::{header, Response, StatusCode},
response::IntoResponse,
};

use crate::{
database::{tables::events::Event, MAX_LIST_LIMIT},
error::Result,
server::AppState,
};

pub async fn feed(
State(state): State<AppState>,
Query(params): Query<HashMap<String, String>>,
) -> Result<impl IntoResponse> {
let txn = state.db.env.read_txn()?;

let limit = params
.get("limit")
.and_then(|l| l.parse::<u16>().ok())
.unwrap_or(MAX_LIST_LIMIT)
.min(MAX_LIST_LIMIT);

let mut cursor = params
.get("cursor")
.map(|c| c.as_str())
.unwrap_or("0000000000000");

// Guard against bad cursor
if cursor.len() < 13 {
cursor = "0000000000000"
}

let mut result: Vec<String> = vec![];
let mut next_cursor = cursor.to_string();

for _ in 0..limit {
match state
.db
.tables
.events
.get_greater_than(&txn, &next_cursor)?
{
Some((timestamp, event_bytes)) => {
let event = Event::deserialize(event_bytes)?;

let line = format!("{} {}", event.operation(), event.url());
next_cursor = timestamp.to_string();

result.push(line);
}
None => break,
};
}

if !result.is_empty() {
result.push(format!("cursor: {next_cursor}"))
}

txn.commit()?;

Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(result.join("\n")))
.unwrap())
}
4 changes: 2 additions & 2 deletions pubky-homeserver/src/routes/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn put(
}

pub async fn get(
State(mut state): State<AppState>,
State(state): State<AppState>,
pubky: Pubky,
path: EntryPath,
Query(params): Query<HashMap<String, String>>,
Expand Down Expand Up @@ -96,7 +96,7 @@ pub async fn get(

return Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(vec.join("\n")))
.unwrap());
}
Expand Down
Loading

0 comments on commit 9131cef

Please sign in to comment.