Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OpalServer-EvenBroadcaster integration & other OpalServer refactors #632

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 0 additions & 1 deletion documentation/docs/getting-started/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ Please use this table as a reference.
| POLICY_BUNDLE_TMP_PATH | Path for temp policy file. It needs to be writable. | |
| POLICY_BUNDLE_GIT_ADD_PATTERN | File pattern to add files to all the git default files. | |
| REPO_WATCHER_ENABLED | | |
| PUBLISHER_ENABLED | | |
| BROADCAST_KEEPALIVE_INTERVAL | The time to wait between sending two consecutive broadcaster keepalive messages. | |
| BROADCAST_KEEPALIVE_TOPIC | The topic on which we should send broadcaster keepalive messages. | |
| MAX_CHANNELS_PER_CLIENT | Max number of records per client, after this number it will not be added to statistics, relevant only if `STATISTICS_ENABLED`. | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def setup_server(event):
# Server without git watcher and with a test specific url for data, and without broadcasting
server = OpalServer(
init_policy_watcher=False,
init_publisher=False,
data_sources_config=DATA_SOURCES_CONFIG,
broadcaster_uri=None,
enable_jwks_endpoint=False,
Expand Down
13 changes: 12 additions & 1 deletion packages/opal-common/opal_common/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,24 @@ def __init__(self):
self._tasks: List[asyncio.Task] = []

def _cleanup_task(self, done_task):
self._tasks.remove(done_task)
try:
self._tasks.remove(done_task)
except KeyError:
...

def add_task(self, f):
t = asyncio.create_task(f)
self._tasks.append(t)
t.add_done_callback(self._cleanup_task)

async def join(self, cancel=False):
if cancel:
for t in self._tasks:
if not t.done():
t.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks.clear()


async def repeated_call(
func: Coroutine,
Expand Down
2 changes: 1 addition & 1 deletion packages/opal-common/opal_common/confi/confi.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def wrapped_cast(value, *args, **kwargs):
return wrapped_cast


def load_conf_if_none(variable, conf):
def load_conf_if_none(variable: Any, conf: Any):
if variable is None:
return conf
else:
Expand Down
208 changes: 0 additions & 208 deletions packages/opal-common/opal_common/topics/publisher.py

This file was deleted.

3 changes: 0 additions & 3 deletions packages/opal-server/opal_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ class OpalServerConfig(Confi):

REPO_WATCHER_ENABLED = confi.bool("REPO_WATCHER_ENABLED", True)

# publisher
PUBLISHER_ENABLED = confi.bool("PUBLISHER_ENABLED", True)

# broadcaster keepalive
BROADCAST_KEEPALIVE_INTERVAL = confi.int(
"BROADCAST_KEEPALIVE_INTERVAL",
Expand Down
21 changes: 7 additions & 14 deletions packages/opal-server/opal_server/data/data_update_publisher.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
import asyncio
import os
from typing import List
from typing import List, Union

from fastapi_utils.tasks import repeat_every
from opal_common.logger import logger
from opal_common.schemas.data import (
DataSourceEntryWithPollingInterval,
DataUpdate,
ServerDataSourceConfig,
)
from opal_common.topics.publisher import TopicPublisher
from opal_common.schemas.data import DataUpdate
from opal_server.pubsub import PubSub
from opal_server.scopes.scoped_pubsub import ScopedPubSub

TOPIC_DELIMITER = "/"
PREFIX_DELIMITER = ":"


class DataUpdatePublisher:
def __init__(self, publisher: TopicPublisher) -> None:
self._publisher = publisher
def __init__(self, pubsub: Union[PubSub, ScopedPubSub]) -> None:
self._pubsub = pubsub

@staticmethod
def get_topic_combos(topic: str) -> List[str]:
Expand Down Expand Up @@ -108,6 +103,4 @@ async def publish_data_updates(self, update: DataUpdate):
entries=logged_entries,
)

await self._publisher.publish(
list(all_topic_combos), update.dict(by_alias=True)
)
await self._pubsub.publish(list(all_topic_combos), update.dict(by_alias=True))
9 changes: 3 additions & 6 deletions packages/opal-server/opal_server/policy/watcher/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
PolicyUpdateMessage,
PolicyUpdateMessageNotification,
)
from opal_common.topics.publisher import TopicPublisher
from opal_common.topics.utils import policy_topics
from opal_server.pubsub import PubSub


async def create_update_all_directories_in_repo(
Expand Down Expand Up @@ -104,7 +104,7 @@ def is_path_affected(path: Path) -> bool:
async def publish_changed_directories(
old_commit: Commit,
new_commit: Commit,
publisher: TopicPublisher,
pubsub: PubSub,
file_extensions: Optional[List[str]] = None,
bundle_ignore: Optional[List[str]] = None,
):
Expand All @@ -116,7 +116,4 @@ async def publish_changed_directories(
)

if notification:
async with publisher:
await publisher.publish(
topics=notification.topics, data=notification.update.dict()
)
await pubsub.publish_sync(notification.topics, notification.update.dict())
Loading
Loading