Skip to content

Commit

Permalink
Add graph_out to antelope.oracles
Browse files Browse the repository at this point in the history
  • Loading branch information
0237h committed Nov 18, 2023
1 parent 33a5cac commit fa218fe
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 137 deletions.
2 changes: 1 addition & 1 deletion antelope.oracles/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "antelope_oracles"
version = "0.0.6"
version = "0.0.7"
authors = [
"Denis <denis@pinax.network>",
"Etienne <etienne@pinax.network>"
Expand Down
10 changes: 10 additions & 0 deletions antelope.oracles/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS quotes (
q_pair String,
q_id String,
q_median String,
q_owner String,
q_timestamp String,
q_value String
)
ENGINE = MergeTree()
ORDER BY (q_pair)
134 changes: 5 additions & 129 deletions antelope.oracles/src/maps.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
use antelope::Symbol;
use substreams::log;
use substreams::errors::Error;
use substreams_antelope::Block;

use crate::abi;
use crate::antelope_oracles::*;
use crate::utils;
use antelope::{Asset, Name, SymbolCode};
use antelope::{Name, SymbolCode};

#[substreams::handlers::map]
fn map_prices(params: String, block: Block) -> Result<Prices, Error> {
fn map_prices(_params: String, block: Block) -> Result<Prices, Error> {
let mut response = vec![];

for trx in block.all_transaction_traces() {
for db_op in &trx.db_ops {
let contract = db_op.code.clone();
let raw_primary_key = Name::from(db_op.primary_key.as_str()).value;
let symcode = SymbolCode::from(raw_primary_key);
let account = db_op.scope.clone();
let _symcode = SymbolCode::from(raw_primary_key);
let _account = db_op.scope.clone();

if contract == "oracle.defi" && db_op.table_name == "prices" {
//log::debug!("contract={:?} / table_name={:?}", contract, db_op.table_name);
Expand Down Expand Up @@ -56,7 +54,7 @@ fn map_prices(params: String, block: Block) -> Result<Prices, Error> {
}

#[substreams::handlers::map]
fn map_quotes(params: String, block: Block) -> Result<Quotes, Error> {
fn map_quotes(_params: String, block: Block) -> Result<Quotes, Error> {
let mut response = vec![];

for trx in block.all_transaction_traces() {
Expand Down Expand Up @@ -93,125 +91,3 @@ fn map_quotes(params: String, block: Block) -> Result<Quotes, Error> {

Ok(Quotes { quotes: response })
}

// Work In Progress: Extract pairs information from `pairs` table of `delphioracle`
/*#[substreams::handlers::map]
fn map_pairs(params: String, block: Block) -> Result<Pairs, Error> {
let mut response = vec![];
// query-params
/*let filter_from = utils::create_filters(params.as_str(), "from");
let filter_to = utils::create_filters(params.as_str(), "to");
let filter_symcode = utils::create_filters(params.as_str(), "symcode");
let filter_contract = utils::create_filters(params.as_str(), "contract");
let filter_to_or_from = utils::create_filters(params.as_str(), "to_or_from");
let filter_quantity_lt = utils::create_i64_filter(params.as_str(), "quantity_lt");
let filter_quantity_gt = utils::create_i64_filter(params.as_str(), "quantity_gt");
let filter_quantity_lte = utils::create_i64_filter(params.as_str(), "quantity_lte");
let filter_quantity_gte = utils::create_i64_filter(params.as_str(), "quantity_gte");*/
for trx in block.all_transaction_traces() {
for db_op in &trx.db_ops {
let contract = db_op.code.clone();
if contract == "delphioracle" {// && db_op.table_name == "pairs" {
log::debug!("contract={:?} / table_name={:?} / is datapoints ? {:?}", contract, db_op.table_name, db_op.table_name == "datapoints");
log::debug!("new_data_json={:?}", db_op.new_data_json);
response.push(Pair {
active: false,
bounty_awarded: false,
bounty_edited_by_custodians: false,
proposer: "test".to_string(),
name: "test".to_string(),
bounty_amount: "test".to_string(),
approving_custodians: vec![],
approving_oracles: vec![],
base_symbol: "test/test".to_string(),
base_type: 1,
base_contract: "test_contract".to_string(),
quote_symbol: "test".to_string(),
quote_type: 1,
quote_contract: "test_contract".to_string(),
quoted_precision: 8
});
/*match abi::Pairs::try_from(db_op.new_data_json.as_str()) {
Ok(pair) => {
response.push(Pair {
active: false,
bounty_awarded: false,
bounty_edited_by_custodians: false,
proposer: "test".to_string(),
name: "test".to_string(),
bounty_amount: "test".to_string(),
approving_custodians: vec![],
approving_oracles: vec![],
base_symbol: "test/test".to_string(),
base_type: 1,
base_contract: "test_contract".to_string(),
quote_symbol: "test".to_string(),
quote_type: 1,
quote_contract: "test_contract".to_string(),
quoted_precision: 8
});
}
Err(_) => continue,
}*/
}
}
/* // action traces
for trace in &trx.action_traces {
let action_trace = trace.action.as_ref().unwrap();
if action_trace.account != trace.receiver { continue; }
if action_trace.name != "transfer" { continue; }
match abi::Transfer::try_from(action_trace.json_data.as_str()) {
Ok(data) => {
let quantity = Asset::from(data.quantity.as_str());
let symcode = quantity.symbol.code().to_string();
let precision = quantity.symbol.precision().into();
let amount = quantity.amount;
let contract = action_trace.account.clone();
// filter by params
if !filter_from.is_empty() && !filter_from.contains(&data.from) { continue; }
if !filter_to.is_empty() && !filter_to.contains(&data.to) { continue; }
if !filter_symcode.is_empty() && !filter_symcode.contains(&symcode) { continue; }
if !filter_contract.is_empty() && !filter_contract.contains(&contract) { continue; }
if !filter_to_or_from.is_empty() && !(filter_to_or_from.contains(&data.to) || filter_to_or_from.contains(&data.from)) { continue; }
if filter_quantity_lt.is_some() && !(quantity.amount < filter_quantity_lt.unwrap()) { continue; }
if filter_quantity_gt.is_some() && !(quantity.amount > filter_quantity_gt.unwrap()) { continue; }
if filter_quantity_lte.is_some() && !(quantity.amount <= filter_quantity_lte.unwrap()) { continue; }
if filter_quantity_gte.is_some() && !(quantity.amount >= filter_quantity_gte.unwrap()) { continue; }
response.push(TransferEvent {
// trace information
trx_id: trx.id.clone(),
action_ordinal: trace.action_ordinal,
// contract & scope
contract,
action: action_trace.name.clone(),
symcode,
// payload
from: data.from,
to: data.to,
quantity: data.quantity,
memo: data.memo,
// extras
precision,
amount,
value: utils::to_value(quantity),
});
}
Err(_) => continue,
}
}*/
}
Ok(Pairs { pairs: response })
}*/
26 changes: 21 additions & 5 deletions antelope.oracles/src/sinks.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use antelope::Asset;
use substreams::errors::Error;
use substreams_entity_change::pb::entity::{entity_change, EntityChanges};
use substreams_entity_change::{pb::entity::EntityChanges, tables::Tables};
use substreams_sink_kv::pb::sf::substreams::sink::kv::v1::KvOperations;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use substreams::pb::substreams::Clock;

use crate::antelope_oracles::{Pairs, Quotes};
use crate::antelope_oracles::Quotes;

// Work In Progress: Make a generic db_out for oracle information
#[substreams::handlers::map]
Expand All @@ -31,12 +30,29 @@ pub fn kv_out(map_quotes: Quotes, clock: Clock) -> Result<KvOperations, Error> {
let seconds = clock.timestamp.unwrap().seconds;
let epoch = (seconds / 86400) * 86400;

let day = epoch / 86400;
let _day = epoch / 86400;

for quote in map_quotes.quotes {
let key = format!("delphioracle:{}:{}", quote.value.as_ref().ok_or(0).unwrap().timestamp, quote.pair);
kv_out.push_new(key, &quote.value.ok_or(0).unwrap().value.to_ne_bytes(), 1);
}

Ok(kv_out)
}
}

#[substreams::handlers::map]
pub fn graph_out(map_quotes: Quotes) -> Result<EntityChanges, Error> {
let mut table = Tables::new();

for quote in map_quotes.quotes {
let datapoint = quote.value.unwrap();
let row = table.create_row("quotes", &quote.pair).set("q_pair", &quote.pair);
row.set("q_id", datapoint.id);
row.set("q_median", datapoint.median);
row.set("q_owner", datapoint.owner);
row.set("q_timestamp", datapoint.timestamp);
row.set("q_value", datapoint.value);
}

Ok(table.to_entity_changes())
}
19 changes: 17 additions & 2 deletions antelope.oracles/substreams.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
specVersion: v0.1.0
package:
name: antelope_oracles
version: v0.0.6
version: v0.0.7
url: https://github.com/pinax-network/substreams
doc: Antelope `eosio.token` based action traces & database operations.

imports:
entities: https://github.com/streamingfast/substreams-sink-entity-changes/releases/download/v1.3.0/substreams-sink-entity-changes-v1.3.0.spkg
kv: https://github.com/streamingfast/substreams-sink-kv/releases/download/v0.1.2/substreams-sink-kv-v0.1.2.spkg
database_change: https://github.com/streamingfast/substreams-database-change/releases/download/v1.0.0/substreams-database-change-v1.0.0.spkg

Expand All @@ -21,6 +22,13 @@ protobuf:
- ./proto/v1

modules:
- name: store_pairs
kind: store
updatePolicy: set_if_not_exists
valueType: proto:antelope.oracles.v1.Pairs
inputs:
- source: sf.antelope.type.v1.Block

- name: map_prices
kind: map
inputs:
Expand Down Expand Up @@ -50,4 +58,11 @@ modules:
inputs:
- map: map_quotes
output:
type: proto:sf.substreams.sink.database.v1.DatabaseChanges
type: proto:sf.substreams.sink.database.v1.DatabaseChanges

- name: graph_out
kind: map
inputs:
- map: map_quotes
output:
type: proto:sf.substreams.sink.entity.v1.EntityChanges

0 comments on commit fa218fe

Please sign in to comment.