Skip to content

Commit

Permalink
Merge pull request #11 from delta-mpc/lr
Browse files Browse the repository at this point in the history
Lr
  • Loading branch information
mh739025250 authored Jul 14, 2022
2 parents aca0a3e + a08a446 commit 575b75d
Show file tree
Hide file tree
Showing 11 changed files with 901 additions and 17 deletions.
18 changes: 12 additions & 6 deletions delta/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import abc
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Tuple, Union
from typing import Any, Dict, List, Tuple, Union, Set

import networkx as nx
import numpy as np
import numpy.typing as npt
import pandas

from .strategy import Strategy
Expand Down Expand Up @@ -35,9 +36,14 @@
"TaskConstructer",
"AggValueType",
"AggResultType",
"EarlyStop",
]


class EarlyStop(Exception):
pass


class DataLocation(str, Enum):
CLIENT = "CLIENT"
SERVER = "SERVER"
Expand Down Expand Up @@ -216,7 +222,7 @@ def type(self) -> OpType:
return OpType.REDUCE


AggValueType = Union[np.ndarray, pandas.DataFrame, pandas.Series]
AggValueType = Union[npt.NDArray, pandas.DataFrame, pandas.Series]

AggResultType = Dict[str, AggValueType]

Expand Down Expand Up @@ -463,18 +469,18 @@ class TaskConstructer(object):
def build(constructor: TaskConstructer) -> Task:
graph = nx.DiGraph()
# bfs build graph
queue: List[GraphNode | Operator] = list(constructor.outputs)
queue: Set[GraphNode | Operator] = set(constructor.outputs)
while len(queue):
next_queue = []
next_queue = set()
for node in queue:
if isinstance(node, GraphNode):
if node.src:
graph.add_edge(node.src, node)
next_queue.append(node.src)
next_queue.add(node.src)
elif isinstance(node, Operator):
for src in node.inputs:
graph.add_edge(src, node)
next_queue.append(src)
next_queue.add(src)
queue = next_queue

# name variables
Expand Down
11 changes: 7 additions & 4 deletions delta/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from .dataset import load_dataframe, load_dataset
from .core.task import (ClientContext, DataFormat, DataLocation, DataNode,
InputGraphNode, ServerContext, Task)
InputGraphNode, ServerContext, Task, EarlyStop)


class Aggregator(object):
Expand Down Expand Up @@ -42,7 +42,7 @@ def get(self, *vars: DataNode) -> List[Any]:
value = self._server_ctx.get(var)[0]

if value is None:
raise ValueError(f"Cannot get var {var.name}")
raise KeyError(f"Cannot get var {var.name}")

res.append(value)

Expand Down Expand Up @@ -83,7 +83,7 @@ def get(self, *vars: DataNode) -> List[Any]:
self._state[var.name] = value

if value is None:
raise ValueError(f"Cannot get var {var.name}")
raise KeyError(f"Cannot get var {var.name}")

res.append(value)

Expand Down Expand Up @@ -117,7 +117,10 @@ def debug(task: Task, **data: Any):

for step in task.steps:
step.map(client_context)
step.reduce(server_context)
try:
step.reduce(server_context)
except EarlyStop:
break

res = tuple(server_context.get(*task.outputs))
if len(res) == 1:
Expand Down
5 changes: 5 additions & 0 deletions delta/statsmodel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .logit import LogitTask
from .mnlogit import MNLogitTask


__all__ = ["LogitTask", "MNLogitTask"]
238 changes: 238 additions & 0 deletions delta/statsmodel/logit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Any, Callable, Dict, List, Tuple

import numpy as np
import numpy.typing as npt
from delta.core.strategy import CURVE_TYPE, AnalyticsStrategy
from delta.core.task import (
DataLocation,
DataNode,
GraphNode,
InputGraphNode,
MapOperator,
MapReduceOperator,
)
from delta.task import HorizontalTask

from .optimizer import fit

__all__ = ["LogitTask"]

FloatArray = npt.NDArray[np.float_]
IntArray = npt.NDArray[np.int_]


def sigmoid(x: FloatArray) -> FloatArray:
return 1 / (1 + np.exp(-x))


def logsigmoid(x: FloatArray) -> FloatArray:
return x - np.log(1 + np.exp(x))


def check_params(params: FloatArray, x: FloatArray):
assert params.ndim == 1
assert params.size == x.shape[1]


def loglike(params: FloatArray, y: IntArray, x: FloatArray):
p = (2 * y - 1) * np.dot(x, params)
return np.sum(logsigmoid(p))


def loglikeobs(params: FloatArray, y: IntArray, x: FloatArray):
p = (2 * y - 1) * np.dot(x, params)
return logsigmoid(p)


def score(params: FloatArray, y: IntArray, x: FloatArray):
p = np.dot(x, params)
return np.dot(x.T, y - sigmoid(p))


def score_obs(params: FloatArray, y: IntArray, x: FloatArray):
p = np.dot(x, params)
return (y - sigmoid(p)) * x


def hessian(params: FloatArray, y: IntArray, x: FloatArray):
p = np.dot(x, params)
l = sigmoid(p)
ll = l * (1 - l)
return -np.dot(x.T * ll, x)


class LogitTask(HorizontalTask):
def __init__(
self,
name: str,
min_clients: int = 2,
max_clients: int = 2,
wait_timeout: float = 60,
connection_timeout: float = 60,
precision: int = 8,
curve: CURVE_TYPE = "secp256k1",
) -> None:
strategy = AnalyticsStrategy(
min_clients=min_clients,
max_clients=max_clients,
wait_timeout=wait_timeout,
connection_timeout=connection_timeout,
precision=precision,
curve=curve,
)
super().__init__(name, strategy)

@abstractmethod
def preprocess(self, **inputs: Any) -> Tuple[Any, Any]:
...

@abstractmethod
def dataset(self) -> Dict[str, InputGraphNode]:
...

def options(self) -> Dict[str, Any]:
return {
"maxiter": 35,
"method": "newton",
"ord": np.inf,
"tol": 1e-8,
"ridge_factor": 1e-10,
}

def _fit(
self,
x_node: GraphNode,
y_node: GraphNode,
params_node: GraphNode,
method: str = "newton",
maxiter: int = 35,
**kwargs,
):
def f(params, y, x):
return -loglike(params, y, x)

def g(params, y, x):
return -score(params, y, x)

def h(params, y, x):
return -hessian(params, y, x)

opt_params, f_opt, iteration = fit(
f,
g,
x_node,
y_node,
params_node,
method,
hessian=h,
maxiter=maxiter,
**kwargs,
)
return opt_params, f_opt, iteration

def _build_graph(self) -> Tuple[List[InputGraphNode], List[GraphNode]]:
inputs = self.dataset()
input_nodes: List[InputGraphNode] = list(inputs.values())
for name, node in inputs.items():
node.name = name

class Preprocess(MapOperator):
def __init__(
self,
name: str,
inputs: List[DataNode],
outputs: List[DataNode],
preprocess: Callable,
names: List[str],
) -> None:
super().__init__(
name, inputs, outputs, preprocess=preprocess, names=names
)
self.preprocess = preprocess
self.names = names

def map(self, *args) -> Tuple[FloatArray, FloatArray]:
kwargs = dict(zip(self.names, args))
x, y = self.preprocess(**kwargs)
x = np.asarray(x, dtype=np.float64)
assert x.ndim == 2, "x can only be in dim 2"
y = np.asarray(y, dtype=np.int8).squeeze()
assert x.shape[0] == y.shape[0], "x and y should have same items"
return x, y

x_node = GraphNode(
name="x",
location=DataLocation.CLIENT,
)
y_node = GraphNode(
name="y",
location=DataLocation.CLIENT,
)

preprocess_op = Preprocess(
name="preprocess",
inputs=list(inputs.values()),
outputs=[x_node, y_node],
preprocess=self.preprocess,
names=list(inputs.keys()),
)
x_node.src = preprocess_op
y_node.src = preprocess_op

options = self.options()
method = options.pop("method", "newton")
maxiter = options.pop("maxiter", 35)
start_params = options.pop("start_params", None)

if start_params is None:
params_node = GraphNode(name="params", location=DataLocation.SERVER)

class ParamsInitOp(MapReduceOperator):
def map(self, x: FloatArray, y: IntArray):
params = np.zeros((x.shape[1],), dtype=np.float64)
return {"params": params}

def reduce(self, data, node_count: int):
params = data["params"]
return params

params_init_op = ParamsInitOp(
name="params_init",
map_inputs=[x_node, y_node],
reduce_inputs=[],
map_outputs=[],
reduce_outputs=[params_node],
)
params_node.src = params_init_op
else:
origin_params_node = InputGraphNode(
name="params", location=DataLocation.SERVER, default=start_params
)
params_node = GraphNode(name="params", location=DataLocation.SERVER)

class ParamsCheckOp(MapReduceOperator):
def map(self, params: FloatArray, x: FloatArray, y: IntArray):
assert check_params(params, x)
return {"params": params}

def reduce(self, data, node_count: int):
params = data["params"]
return params

params_check_op = ParamsCheckOp(
name="params_check",
map_inputs=[origin_params_node, x_node, y_node],
reduce_inputs=[],
map_outputs=[],
reduce_outputs=[params_node],
)
params_node.src = params_check_op

input_nodes.append(origin_params_node)

outputs = self._fit(x_node, y_node, params_node, method, maxiter, **options)

return input_nodes, list(outputs)
Loading

0 comments on commit 575b75d

Please sign in to comment.