Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/aeternity/elixir-research
Browse files Browse the repository at this point in the history
…into GH-125
  • Loading branch information
cheezus1 committed Dec 15, 2017
2 parents 4775633 + 26e3f23 commit b6a9911
Show file tree
Hide file tree
Showing 30 changed files with 234 additions and 129 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ before_install:

script:
- mix coveralls -u
- mix credo list --ignore readability,design,refactor

cache:
- cargo
- $TRAVIS_BUILD_DIR/target
- $HOME/.cargo
- deps
- _build
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Aecore.Utils.Blockchain.BlockValidation do
defmodule Aecore.Chain.BlockValidation do

alias Aecore.Keys.Worker, as: KeyManager
alias Aecore.Pow.Cuckoo
Expand All @@ -7,7 +7,7 @@ defmodule Aecore.Utils.Blockchain.BlockValidation do
alias Aecore.Structures.Header
alias Aecore.Structures.SignedTx
alias Aecore.Chain.ChainState
alias Aecore.Utils.Blockchain.Difficulty
alias Aecore.Chain.Difficulty

@spec validate_block!(Block.block(), Block.block(), map(), list()) :: {:error, term()} | :ok
def validate_block!(new_block, previous_block, chain_state, blocks_for_difficulty_calculation) do
Expand Down Expand Up @@ -142,7 +142,7 @@ defmodule Aecore.Utils.Blockchain.BlockValidation do

@spec calculate_root_hash(list()) :: binary()
def calculate_root_hash(txs) do
if length(txs) == 0 do
if Enum.empty?(txs) do
<<0::256>>
else
merkle_tree =
Expand Down
2 changes: 1 addition & 1 deletion apps/aecore/lib/aecore/chain/chain_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ defmodule Aecore.Chain.ChainState do
{account, :erlang.term_to_binary(data)}
end

if length(merkle_tree_data) == 0 do
if Enum.empty?(merkle_tree_data) do
<<0::256>>
else
merkle_tree =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Aecore.Utils.Blockchain.Difficulty do
defmodule Aecore.Chain.Difficulty do

@number_of_blocks 100
@max_difficulty_change 2
Expand Down
6 changes: 3 additions & 3 deletions apps/aecore/lib/aecore/chain/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ defmodule Aecore.Chain.Worker do
alias Aecore.Structures.Block
alias Aecore.Chain.ChainState
alias Aecore.Txs.Pool.Worker, as: Pool
alias Aecore.Utils.Blockchain.BlockValidation
alias Aecore.Chain.BlockValidation
alias Aecore.Peers.Worker, as: Peers
alias Aecore.Persistence.Worker, as: Persistence
alias Aecore.Utils.Blockchain.Difficulty
alias Aecore.Chain.Difficulty

use GenServer

Expand Down Expand Up @@ -202,7 +202,7 @@ defmodule Aecore.Chain.Worker do
Persistence.write_block_by_hash(block)

## Block was validated, now we can send it to other peers
Peers.broadcast_to_all({:new_block, block})
Peers.broadcast_block(block)

{:reply, :ok, {updated_block_map, updated_latest_block_chainstate, new_txs_index}}
catch
Expand Down
6 changes: 3 additions & 3 deletions apps/aecore/lib/aecore/miner/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ defmodule Aecore.Miner.Worker do
use GenStateMachine, callback_mode: :state_functions

alias Aecore.Chain.Worker, as: Chain
alias Aecore.Utils.Blockchain.BlockValidation
alias Aecore.Utils.Blockchain.Difficulty
alias Aecore.Chain.BlockValidation
alias Aecore.Chain.Difficulty
alias Aecore.Structures.Header
alias Aecore.Structures.Block
alias Aecore.Pow.Cuckoo
Expand Down Expand Up @@ -200,7 +200,7 @@ defmodule Aecore.Miner.Worker do
Block.current_block_version()
)

Logger.debug("start nonce #{start_nonce}. Final nonce = #{start_nonce + @nonce_per_cycle}")
Logger.debug(fn -> "start nonce #{start_nonce}. Final nonce = #{start_nonce + @nonce_per_cycle}" end)

case Cuckoo.generate(%{unmined_header | nonce: start_nonce + @nonce_per_cycle}) do
{:ok, mined_header} ->
Expand Down
32 changes: 21 additions & 11 deletions apps/aecore/lib/aecore/peers/sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ defmodule Aecore.Peers.Sync do

alias Aecore.Peers.Worker, as: Peers
alias Aehttpclient.Client, as: HttpClient
alias Aecore.Chain.Worker, as: Chain
alias Aecore.Chain.BlockValidation
alias Aeutil.Serialization

use GenServer

Expand Down Expand Up @@ -60,8 +63,8 @@ defmodule Aecore.Peers.Sync do
end

def handle_call({:ask_peers_for_unknown_blocks, peers}, _from, state) do
state = Enum.reduce(peers, state, fn ({uri, latest_block_hash}, acc) ->
Map.merge(acc, check_peer_block(uri, latest_block_hash, %{}))
state = Enum.reduce(peers, state, fn ({_, %{uri: uri, latest_block: latest_block}}, acc) ->
Map.merge(acc, check_peer_block(uri, latest_block, %{}))
end)

{:reply, :ok, state}
Expand Down Expand Up @@ -103,7 +106,10 @@ defmodule Aecore.Peers.Sync do
peers_count == 0 ->
{:error, "No peers"}
peers_count < @peers_target_count ->
all_peers = Map.keys(Peers.all_peers())
all_peers =
Peers.all_peers()
|> Map.values()
|> Enum.map(fn(%{uri: uri}) -> uri end)
new_count = get_newpeers_and_add(all_peers)
if new_count > 0 do
Logger.info(fn -> "Aquired #{new_count} new peers" end)
Expand All @@ -126,8 +132,11 @@ defmodule Aecore.Peers.Sync do
|> Enum.take(@peers_target_count - known_count)
|> Enum.reduce([], fn(peer, acc) ->
case (HttpClient.get_peers(peer)) do
{:ok, list} -> Enum.concat(acc, Map.keys(list))
:error -> acc
{:ok, list} ->
Enum.concat(acc, Enum.map(Map.values(list),
fn(%{"uri" => uri}) -> uri end))
:error ->
acc
end
end)
|> Enum.reduce([], fn(peer, acc) ->
Expand All @@ -142,8 +151,10 @@ defmodule Aecore.Peers.Sync do
#if we have successfully added less then number_of_peers_to_add peers then try to add another one
if acc < number_of_peers_to_add do
case Peers.add_peer(peer) do
:ok -> acc + 1
_ -> acc
:ok ->
acc + 1
_ ->
acc
end
else
acc
Expand Down Expand Up @@ -186,15 +197,14 @@ defmodule Aecore.Peers.Sync do
case Chain.has_block?(block_hash) do
false ->
case(HttpClient.get_block({peer_uri, block_hash})) do
{:ok, peer_block} ->
deserialized_block = Serialization.block(peer_block, :deserialize)
{:ok, deserialized_block} ->
try do
BlockValidation.single_validate_block(deserialized_block)
peer_block_hash =
BlockValidation.block_header_hash(deserialized_block.header)

if block_hash == Base.encode16(peer_block_hash) do
check_peer_block(peer_uri, peer_block.header.prev_hash,
if(block_hash == Base.encode16(peer_block_hash)) do
check_peer_block(peer_uri, Serialization.hex_binary(deserialized_block.header.prev_hash, :serialize),
Map.put(state, peer_block_hash, deserialized_block))
else
state
Expand Down
141 changes: 79 additions & 62 deletions apps/aecore/lib/aecore/peers/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ defmodule Aecore.Peers.Worker do

alias Aehttpclient.Client
alias Aecore.Structures.Block
alias Aecore.Utils.Blockchain.BlockValidation
alias Aehttpclient.Client, as: HttpClient
alias Aecore.Utils.Serialization
alias Aecore.Structures.SignedTx
alias Aecore.Chain.BlockValidation

require Logger

Expand Down Expand Up @@ -39,6 +38,13 @@ defmodule Aecore.Peers.Worker do
GenServer.call(__MODULE__, :check_peers)
end

@spec all_uris() :: list(binary())
def all_uris() do
all_peers()
|> Map.values()
|> Enum.map(fn(%{uri: uri}) -> uri end)
end

@spec all_peers() :: map()
def all_peers() do
GenServer.call(__MODULE__, :all_peers)
Expand All @@ -51,20 +57,9 @@ defmodule Aecore.Peers.Worker do
|> Base.encode16()
end


@doc """
Making async post requests to the users
`type` is related to the uri e.g. /new_block
"""
@spec broadcast_to_all({type :: atom(), data :: term()}) :: :ok | :error
def broadcast_to_all({type, data}) do
data = prep_data(type, data)
GenServer.cast(__MODULE__, {:broadcast_to_all, {type, data}})
end

@spec schedule_add_peer(uri :: term()) :: term()
def schedule_add_peer(uri) do
GenServer.cast(__MODULE__, {:schedule_add_peer, uri})
@spec schedule_add_peer(uri :: term(), nonce :: integer()) :: term()
def schedule_add_peer(uri, nonce) do
GenServer.cast(__MODULE__, {:schedule_add_peer, uri, nonce})
end

@doc """
Expand All @@ -86,6 +81,22 @@ defmodule Aecore.Peers.Worker do
end
end

@spec broadcast_block(%Block{}) :: :ok
def broadcast_block(block) do
spawn fn ->
Client.send_block(block, all_uris())
end
:ok
end

@spec broadcast_tx(%SignedTx{}) :: :ok
def broadcast_tx(tx) do
spawn fn ->
Client.send_tx(tx, all_uris())
end
:ok
end

## Server side

def init(initial_peers) do
Expand Down Expand Up @@ -113,45 +124,44 @@ defmodule Aecore.Peers.Worker do
is updated if the one in the latest GET /info request is different.
"""
def handle_call(:check_peers, _from, %{peers: peers} = state) do
filtered_peers = :maps.filter(fn(peer, _) ->
case Client.get_info(peer) do
{:ok, info} -> info.genesis_block_hash == genesis_block_header_hash()
_ -> false
filtered_peers = :maps.filter(fn(_, %{uri: uri}) ->
case Client.get_info(uri) do
{:ok, info} ->
info.genesis_block_hash == genesis_block_header_hash()
_ ->
false
end
end, peers)

updated_peers =
for {peer, current_block_hash} <- filtered_peers, into: %{} do
{_, info} = Client.get_info(peer)
if info.current_block_hash != current_block_hash do
{peer, info.current_block_hash}
for {nonce, %{uri: uri, latest_block: latest_block}} <- filtered_peers, into: %{} do
{_, info} = Client.get_info(uri)
if info.current_block_hash != latest_block do
{nonce, %{uri: uri, latest_block: info.current_block_hash}}
else
{peer, current_block_hash}
{nonce, %{uri: uri, latest_block: latest_block}}
end
end

removed_peers_count = Enum.count(peers) - Enum.count(filtered_peers)
if removed_peers_count > 0 do
Logger.info(fn -> "#{Enum.count(peers) - Enum.count(filtered_peers)} peers were removed after the check" end)
Logger.info(fn -> "#{removed_peers_count} peers were removed after the check" end)
end

{:reply, :ok, %{state | peers: updated_peers}}
end

def handle_call(:all_peers, _from, %{peers: peers} = state) do
{:reply, peers, %{state | peers: peers}}
{:reply, peers, state}
end

## Async operations

def handle_cast({:broadcast_to_all, {type, data}}, %{peers: peers} = state) do
send_to_peers(type, data, Map.keys(peers))
{:noreply, state}
end

def handle_cast({:schedule_add_peer, uri}, state) do
{:reply, _, state} = add_peer(uri, state)
{:noreply, state}
def handle_cast({:schedule_add_peer, uri, nonce}, %{peers: peers} = state) do
if Map.has_key?(peers, nonce) do
{:noreply, state}
else
{:reply, _, state} = add_peer(uri, state)
{:noreply, state}
end
end

def handle_cast(any, state) do
Expand All @@ -162,28 +172,34 @@ defmodule Aecore.Peers.Worker do
## Internal functions
defp add_peer(uri, state) do
%{peers: peers} = state
if Map.has_key?(peers, uri) do
state_has_uri = peers
|> Map.values()
|> Enum.map(fn(%{uri: uri}) -> uri end)
|> Enum.member?(uri)

if state_has_uri do
Logger.debug(fn ->
"Skipped adding #{uri}, already known" end)
{:reply, {:error, "Peer already known"}, state}
else
case check_peer(uri, get_peer_nonce()) do
{:ok, info} ->
if should_a_peer_be_added(map_size(peers)) do
peers_update1 =
if map_size(peers) >= @peers_max_count do
random_peer = Enum.random(Map.keys(peers))
Logger.debug(fn -> "Max peers reached. #{random_peer} removed" end)
Map.delete(peers, random_peer)
else
peers
end
updated_peers = Map.put(peers_update1, uri, info.current_block_hash)
Logger.info(fn -> "Added #{uri} to the peer list" end)
{:reply, :ok, %{state | peers: updated_peers}}
if(!Map.has_key?(peers, info.peer_nonce)) do
if should_a_peer_be_added(map_size(peers)) do
peers_update1 = trim_peers(peers)
updated_peers =
Map.put(peers_update1, info.peer_nonce,
%{uri: uri, latest_block: info.current_block_hash})
Logger.info(fn -> "Added #{uri} to the peer list" end)
{:reply, :ok, %{state | peers: updated_peers}}
else
Logger.debug(fn -> "Max peers reached. #{uri} not added" end)
{:reply, :ok, state}
end
else
Logger.debug(fn -> "Max peers reached. #{uri} not added" end)
{:reply, :ok, state}
Logger.debug(fn ->
"Skipped adding #{uri}, same nonce already present" end)
{:reply, {:error, "Peer already known"}, state}
end
{:error, "Equal peer nonces"} ->
{:reply, :ok, state}
Expand All @@ -194,14 +210,18 @@ defmodule Aecore.Peers.Worker do
end
end

defp create_nonce_table() do
:ets.new(:nonce_table, [:named_table])
defp trim_peers(peers) do
if map_size(peers) >= @peers_max_count do
random_peer = Enum.random(Map.keys(peers))
Logger.debug(fn -> "Max peers reached. #{random_peer} removed" end)
Map.delete(peers, random_peer)
else
peers
end
end

defp send_to_peers(uri, data, peers) do
for peer <- peers do
HttpClient.post(peer, data, uri)
end
defp create_nonce_table() do
:ets.new(:nonce_table, [:named_table])
end

defp check_peer(uri, own_nonce) do
Expand Down Expand Up @@ -230,7 +250,4 @@ defmodule Aecore.Peers.Worker do
|| :rand.uniform() < @probability_of_peer_remove_when_max
end

defp prep_data(:new_tx, %{} = data), do: Serialization.tx(data, :serialize)
defp prep_data(:new_block, %{} = data), do: Serialization.block(data, :serialize)

end
Loading

0 comments on commit b6a9911

Please sign in to comment.