Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/feed #27

Merged
merged 9 commits into from
Aug 26, 2024
27 changes: 27 additions & 0 deletions pubky-common/src/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Monotonic unix timestamp in microseconds

use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::{
ops::{Add, Sub},
Expand Down Expand Up @@ -83,6 +84,12 @@ impl Timestamp {
}
}

impl Default for Timestamp {
fn default() -> Self {
Timestamp::now()
}
}

impl Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let bytes: [u8; 8] = self.into();
Expand Down Expand Up @@ -155,6 +162,26 @@ impl Sub<u64> for &Timestamp {
}
}

impl Serialize for Timestamp {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let bytes = self.to_bytes();
bytes.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for Timestamp {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let bytes: [u8; 8] = Deserialize::deserialize(deserializer)?;
Ok(Timestamp(u64::from_be_bytes(bytes)))
}
}

#[cfg(not(target_arch = "wasm32"))]
/// Return the number of microseconds since [SystemTime::UNIX_EPOCH]
fn system_time() -> u64 {
Expand Down
4 changes: 3 additions & 1 deletion pubky-homeserver/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub mod tables;

use tables::{Tables, TABLES_COUNT};

pub const MAX_LIST_LIMIT: u16 = 100;

#[derive(Debug, Clone)]
pub struct DB {
pub(crate) env: Env,
Expand Down Expand Up @@ -43,7 +45,7 @@ mod tests {
.join(Timestamp::now().to_string())
.join("pubky");

let mut db = DB::open(&storage).unwrap();
let db = DB::open(&storage).unwrap();

let keypair = Keypair::random();
let path = "/pub/foo.txt";
Expand Down
4 changes: 3 additions & 1 deletion pubky-homeserver/src/database/migrations/m0.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use heed::{Env, RwTxn};

use crate::database::tables::{blobs, entries, sessions, users};
use crate::database::tables::{blobs, entries, events, sessions, users};

pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;
Expand All @@ -11,5 +11,7 @@ pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {

let _: entries::EntriesTable = env.create_database(wtxn, Some(entries::ENTRIES_TABLE))?;

let _: events::EventsTable = env.create_database(wtxn, Some(events::EVENTS_TABLE))?;

Ok(())
}
9 changes: 8 additions & 1 deletion pubky-homeserver/src/database/tables.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod blobs;
pub mod entries;
pub mod events;
pub mod sessions;
pub mod users;

Expand All @@ -8,12 +9,15 @@ use heed::{Env, RwTxn};
use blobs::{BlobsTable, BLOBS_TABLE};
use entries::{EntriesTable, ENTRIES_TABLE};

pub const TABLES_COUNT: u32 = 4;
use self::events::{EventsTable, EVENTS_TABLE};

pub const TABLES_COUNT: u32 = 5;

#[derive(Debug, Clone)]
pub struct Tables {
pub blobs: BlobsTable,
pub entries: EntriesTable,
pub events: EventsTable,
}

impl Tables {
Expand All @@ -25,6 +29,9 @@ impl Tables {
entries: env
.open_database(wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created"),
events: env
.open_database(wtxn, Some(EVENTS_TABLE))?
.expect("Events table already created"),
})
}
}
2 changes: 1 addition & 1 deletion pubky-homeserver/src/database/tables/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub const BLOBS_TABLE: &str = "blobs";

impl DB {
pub fn get_blob(
&mut self,
&self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<bytes::Bytes>> {
Expand Down
41 changes: 33 additions & 8 deletions pubky-homeserver/src/database/tables/entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ use pubky_common::{
timestamp::Timestamp,
};

use crate::database::DB;
use crate::database::{DB, MAX_LIST_LIMIT};

use super::events::Event;

/// full_path(pubky/*path) => Entry.
pub type EntriesTable = Database<Str, Bytes>;

pub const ENTRIES_TABLE: &str = "entries";

const MAX_LIST_LIMIT: u16 = 100;

impl DB {
pub fn put_entry(
&mut self,
Expand Down Expand Up @@ -56,6 +56,19 @@ impl DB {
.entries
.put(&mut wtxn, &key, &entry.serialize())?;

if path.starts_with("pub/") {
let url = format!("pubky://{key}");
let event = Event::put(&url);
let value = event.serialize();

let key = entry.timestamp.to_string();

self.tables.events.put(&mut wtxn, &key, &value)?;

// TODO: delete older events.
// TODO: move to events.rs
}

wtxn.commit()?;

Ok(())
Expand All @@ -74,6 +87,21 @@ impl DB {

let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?;

// create DELETE event
if path.starts_with("pub/") {
let url = format!("pubky://{key}");

let event = Event::delete(&url);
let value = event.serialize();

let key = Timestamp::now().to_string();

self.tables.events.put(&mut wtxn, &key, &value)?;

// TODO: delete older events.
// TODO: move to events.rs
}

deleted_entry & deleted_blobs
} else {
false
Expand Down Expand Up @@ -198,7 +226,7 @@ pub struct Entry {
/// Encoding version
version: usize,
/// Modified at
timestamp: u64,
timestamp: Timestamp,
content_hash: [u8; 32],
content_length: usize,
content_type: String,
Expand All @@ -209,10 +237,7 @@ pub struct Entry {

impl Entry {
pub fn new() -> Self {
Self {
timestamp: Timestamp::now().into_inner(),
..Default::default()
}
Default::default()
}

// === Setters ===
Expand Down
58 changes: 58 additions & 0 deletions pubky-homeserver/src/database/tables/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! Server events (Put and Delete entries)
//!
//! Useful as a realtime sync with Indexers until
//! we implement more self-authenticated merkle data.

use heed::{
types::{Bytes, Str},
Database,
};
use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};

/// Event [Timestamp] base32 => Encoded event.
pub type EventsTable = Database<Str, Bytes>;

pub const EVENTS_TABLE: &str = "events";

#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)]
pub enum Event {
Put(String),
Delete(String),
}

impl Event {
pub fn put(url: &str) -> Self {
Self::Put(url.to_string())
}

pub fn delete(url: &str) -> Self {
Self::Delete(url.to_string())
}

pub fn serialize(&self) -> Vec<u8> {
to_allocvec(self).expect("Session::serialize")
}

pub fn deserialize(bytes: &[u8]) -> core::result::Result<Self, postcard::Error> {
if bytes[0] > 1 {
panic!("Unknown Event version");
}

from_bytes(bytes)
}

pub fn url(&self) -> &str {
match self {
Event::Put(url) => url,
Event::Delete(url) => url,
}
}

pub fn operation(&self) -> &str {
match self {
Event::Put(_) => "PUT",
Event::Delete(_) => "DEL",
}
}
}
2 changes: 2 additions & 0 deletions pubky-homeserver/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::server::AppState;
use self::pkarr::pkarr_router;

mod auth;
mod feed;
mod pkarr;
mod public;
mod root;
Expand All @@ -25,6 +26,7 @@ fn base(state: AppState) -> Router {
.route("/:pubky/*path", put(public::put))
.route("/:pubky/*path", get(public::get))
.route("/:pubky/*path", delete(public::delete))
.route("/events/", get(feed::feed))
.layer(CookieManagerLayer::new())
// TODO: revisit if we enable streaming big payloads
// TODO: maybe add to a separate router (drive router?).
Expand Down
71 changes: 71 additions & 0 deletions pubky-homeserver/src/routes/feed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::collections::HashMap;

use axum::{
body::Body,
extract::{Query, State},
http::{header, Response, StatusCode},
response::IntoResponse,
};

use crate::{
database::{tables::events::Event, MAX_LIST_LIMIT},
error::Result,
server::AppState,
};

pub async fn feed(
State(state): State<AppState>,
Query(params): Query<HashMap<String, String>>,
) -> Result<impl IntoResponse> {
let txn = state.db.env.read_txn()?;

let limit = params
.get("limit")
.and_then(|l| l.parse::<u16>().ok())
.unwrap_or(MAX_LIST_LIMIT)
.min(MAX_LIST_LIMIT);

let mut cursor = params
.get("cursor")
.map(|c| c.as_str())
.unwrap_or("0000000000000");

// Guard against bad cursor
if cursor.len() < 13 {
cursor = "0000000000000"
}

let mut result: Vec<String> = vec![];
let mut next_cursor = cursor.to_string();

for _ in 0..limit {
match state
.db
.tables
.events
.get_greater_than(&txn, &next_cursor)?
{
Some((timestamp, event_bytes)) => {
let event = Event::deserialize(event_bytes)?;

let line = format!("{} {}", event.operation(), event.url());
next_cursor = timestamp.to_string();

result.push(line);
}
None => break,
};
}

if !result.is_empty() {
result.push(format!("cursor: {next_cursor}"))
}

txn.commit()?;

Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(result.join("\n")))
.unwrap())
}
4 changes: 2 additions & 2 deletions pubky-homeserver/src/routes/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn put(
}

pub async fn get(
State(mut state): State<AppState>,
State(state): State<AppState>,
pubky: Pubky,
path: EntryPath,
Query(params): Query<HashMap<String, String>>,
Expand Down Expand Up @@ -96,7 +96,7 @@ pub async fn get(

return Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(vec.join("\n")))
.unwrap());
}
Expand Down
Loading
Loading