Skip to content

Commit

Permalink
Merge pull request #1 from noisyboiler/bump-to-latest-wampy
Browse files Browse the repository at this point in the history
Bump to latest wampy
  • Loading branch information
noisyboiler authored Sep 11, 2017
2 parents aa94d17 + 837fed0 commit e7c510f
Show file tree
Hide file tree
Showing 15 changed files with 442 additions and 366 deletions.
12 changes: 12 additions & 0 deletions nameko_wamp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import logging

try: # Python 2.7+
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
pass


root = logging.getLogger(__name__)
root.addHandler(NullHandler())
152 changes: 0 additions & 152 deletions nameko_wamp/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,152 +0,0 @@
import logging

from nameko.extensions import ProviderCollector, SharedExtension, Extension
from wampy.errors import WampyError
from wampy.peers.clients import Client
from wampy.peers.routers import Crossbar as Router
from wampy.roles.callee import CalleeProxy
from wampy.roles.subscriber import TopicSubscriber

from nameko_wamp.constants import WAMP_CONFIG_KEY
from nameko_wamp.messages import NamekoMessageHandler

logger = logging.getLogger(__name__)


class WampClientProxy(Extension):

def setup(self):
self.config_path = self.container.config[
WAMP_CONFIG_KEY]['config_path']
self.router = Router(config_path=self.config_path)
self.client = Client(
router=self.router, name="nameko-wamp client proxy"
)

def start(self):
self.client.start()

def stop(self):
try:
self.client.stop()
except AttributeError:
pass


class WampTopicProxy(SharedExtension, ProviderCollector):

def setup(self):
self.name = "{}: TopicProxy".format(self.container.service_cls.name)

self._gt = None
self._topics = []
self.config_path = self.container.config[
WAMP_CONFIG_KEY]['config_path']
self.router = Router(config_path=self.config_path)

def start(self):
# we need all entrypoints setup methods to have executed before we can
# compile a list of topics to subscribe to
self._register_topics()
if not self._topics:
logger.warning(
"At least one topic should be subscribed to by: %s", self.name
)

self.client = TopicSubscriber(
topics=self._topics,
callback=self.message_handler,
router=self.router,
name=self.name,
)

self._gt = self.container.spawn_managed_thread(self._consume)

def stop(self):
try:
self.client.stop()
except AttributeError:
pass

self._gt = None

def message_handler(self, *args, **kwargs):
message = kwargs['message']
topic = kwargs['meta']['topic']

for provider in self._providers:
# alternatively could mark providers with subscription IDs?
if provider.topic == topic:
provider.handle_message(message)

def _register_topics(self):
for provider in self._providers:
self._topics.append(provider.topic)

def _consume(self):
self.client.start()


class WampCalleeProxy(SharedExtension, ProviderCollector):

@property
def procedure_names(self):
return self._procedure_callback_map.keys()

def setup(self):
self.name = "{}: CalleeProxy".format(self.container.service_cls.name)
logger.info("setting up WampCalleeProxy for: %s", self.name)

self._gt = None
self._procedure_callback_map = {}

def start(self):
# we need all entrypoints setup methods to have executed before we can
# compile a list of procedure names
self._register_procedures()
if not self.procedure_names:
logger.warning(
"At least one proceure should be registered by: %s", self.name
)

self.config_path = self.container.config[
WAMP_CONFIG_KEY]['config_path']

self.router = Router(config_path=self.config_path)
self.client = CalleeProxy(
router=self.router,
procedure_names=self.procedure_names,
callback=self.message_handler,
message_handler=NamekoMessageHandler,
name=self.name,
)

self._gt = self.container.spawn_managed_thread(self._consume)

def stop(self):
try:
self.client.stop()
except AttributeError:
pass

self._gt = None

def message_handler(self, **message):
meta = message.pop("meta")
procedure_name = meta['procedure_name']
request_id = meta['request_id']

for provider in self._providers:
if provider.method_name == procedure_name:
provider.handle_message(message, request_id=request_id)
break
else:
raise WampyError('no providers matching procedure_name')

def _register_procedures(self):
for provider in self._providers:
self._procedure_callback_map[
provider.method_name] = provider.handle_message

def _consume(self):
self.client.start()
131 changes: 128 additions & 3 deletions nameko_wamp/extensions/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,133 @@
from nameko.extensions import DependencyProvider
from wampy.roles.publisher import PublishProxy
import logging

from nameko.extensions import (
DependencyProvider, ProviderCollector, SharedExtension, Extension)
from wampy.message_handler import MessageHandler
from wampy.roles.caller import RpcProxy
from wampy.peers.clients import Client
from wampy.peers.routers import Crossbar as Router
from wampy.roles.publisher import PublishProxy

from nameko_wamp.constants import WAMP_CONFIG_KEY
from nameko_wamp.wamp import NamekoClient, NamekoMessageHandler

logger = logging.getLogger(__name__)


class WampClientProxy(Extension):

def setup(self):
self.config_path = self.container.config[
WAMP_CONFIG_KEY]['config_path']
self.router = Router(config_path=self.config_path)
self.client = Client(
router=self.router, name="nameko-wamp client proxy"
)

def start(self):
self.client.start()

def stop(self):
try:
self.client.stop()
except AttributeError:
pass


class WampTopicProxy(SharedExtension, ProviderCollector):

def setup(self):
self.name = "{} TopicProxy".format(self.container.service_cls.name)
logger.info("%s WampTopicProxy setting up", self.name)

self._gt = None
self._topics = []
self.config_path = self.container.config[
WAMP_CONFIG_KEY]['config_path']
self.router = Router(config_path=self.config_path)

def start(self):
# we need all entrypoints setup methods to have executed before we can
# compile a list of topics to subscribe to
self._register_topics()
if not self._topics:
logger.warning(
"At least one topic should be subscribed to by: %s", self.name
)

logger.info("registering topics: %s", self._topics)
self.client = NamekoClient(
providers=self._providers,
topics=self._topics,
router=self.router,
message_handler=MessageHandler(),
name=self.name,
)

self._gt = self.container.spawn_managed_thread(self._consume)

def stop(self):
try:
self.client.stop()
except AttributeError:
pass

self._gt = None

def _register_topics(self):
for provider in self._providers:
self._topics.append(provider.topic)

def _consume(self):
self.client.start()


class WampCalleeProxy(SharedExtension, ProviderCollector):

def setup(self):
self.name = "{}: CalleeProxy".format(self.container.service_cls.name)
logger.info("setting up WampCalleeProxy for: %s", self.name)

self._gt = None
self._procedures = []

self.config_path = self.container.config[
WAMP_CONFIG_KEY]['config_path']
self.router = Router(config_path=self.config_path)

def start(self):
# we need all entrypoints setup methods to have executed before we can
# compile a list of procedure names
self._register_procedures()
if not self._procedures:
logger.warning(
"At least one proceure should be registered by: %s", self.name
)

self.client = NamekoClient(
providers=self._providers,
router=self.router,
procedures=self._procedures,
message_handler=NamekoMessageHandler(),
name=self.name,
)

self._gt = self.container.spawn_managed_thread(self._consume)

def stop(self):
try:
self.client.stop()
except AttributeError:
pass

self._gt = None

def _register_procedures(self):
for provider in self._providers:
self._procedures.append(provider.method_name)

from . import WampClientProxy
def _consume(self):
self.client.start()


class Caller(DependencyProvider):
Expand Down
5 changes: 2 additions & 3 deletions nameko_wamp/extensions/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from nameko.extensions import Entrypoint
from wampy.messages import Yield

from . import WampTopicProxy, WampCalleeProxy
from .dependencies import WampTopicProxy, WampCalleeProxy

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -35,8 +35,7 @@ def setup(self):
def stop(self):
self.callee_proxy.unregister_provider(self)

def handle_message(self, *args, **kwargs):
request_id = kwargs.pop("request_id")
def handle_message(self, request_id, *args, **kwargs):
handle_result = partial(self.handle_result, request_id)
self.container.spawn_worker(
self, args, kwargs, handle_result=handle_result
Expand Down
20 changes: 0 additions & 20 deletions nameko_wamp/messages.py

This file was deleted.

File renamed without changes.
3 changes: 2 additions & 1 deletion nameko_wamp/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import logging
from wampy.errors import WampyError

from nameko_wamp.extensions import WampCalleeProxy, WampTopicProxy
from nameko_wamp.extensions.dependencies import (
WampCalleeProxy, WampTopicProxy)

logger = logging.getLogger(__name__)

Expand Down
Loading

0 comments on commit e7c510f

Please sign in to comment.