Skip to content

Commit

Permalink
added save/get files to crossbar
Browse files Browse the repository at this point in the history
  • Loading branch information
atiderko committed Dec 21, 2023
1 parent 6a60734 commit db5c936
Show file tree
Hide file tree
Showing 4 changed files with 398 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# POSSIBILITY OF SUCH DAMAGE.

import json
import os


class RosPackage:
Expand All @@ -43,12 +44,12 @@ def __str__(self):


class PathItem:
'''
"""
:param str path: absolute path of the file or directory
:param float mtime: time of last modification of path. The return value is a number giving the number of seconds since the epoch
:param int size: size, in bytes, of path
:param str path_type: one of types {file, dir, symlink, package}
'''
"""

def __init__(self, path: str, mtime: float, size: int, path_type: str) -> None:
self.path = path
Expand All @@ -61,15 +62,22 @@ def __str__(self):


class LogPathItem:
'''
"""
:param str node: complete node name
:param str screen_log: the absolute path to the screen log file.
:param bool screen_log_exists: False if the file does not exists.
:param str ros_log: the absolute path to the ros log file.
:param bool ros_log_exists: False if the file does not exists.
'''
"""

def __init__(self, node: str, screen_log: str = '', screen_log_exists: bool = False, ros_log: str = '', ros_log_exists: bool = False) -> None:
def __init__(
self,
node: str,
screen_log: str = "",
screen_log_exists: bool = False,
ros_log: str = "",
ros_log_exists: bool = False,
) -> None:
self.node = node
self.screen_log = screen_log
self.screen_log_exists = screen_log_exists
Expand All @@ -78,3 +86,31 @@ def __init__(self, node: str, screen_log: str = '', screen_log_exists: bool = Fa

def __str__(self):
return json.dumps(dict(self), ensure_ascii=False)


class FileItem:
"""
:param str path: absolute path of the file or directory
:param float mtime: time of last modification of path. The return value is a number giving the number of seconds since the epoch
:param int size: size, in bytes, of path
:param str value: content of the file
"""

def __init__(
self,
path: str,
mtime: float = 0,
size: int = 0,
value: str = "",
encoding="utf-8",
) -> None:
self.path = path
self.fileName = os.path.split(path)[-1]
self.mtime = mtime
self.size = size
self.extension = path.rsplit(".", 1)[-1]
self.value = value
self.encoding = encoding

def __str__(self):
return json.dumps(dict(self), ensure_ascii=False)
166 changes: 122 additions & 44 deletions fkie_node_manager_daemon/fkie_node_manager_daemon/file_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
# limitations under the License.


import glob
from io import FileIO
import os
import re

import asyncio
from autobahn import wamp
import json
import os
from types import SimpleNamespace
from typing import List
from fkie_multimaster_pylib import ros_pkg
from fkie_multimaster_pylib.crossbar.base_session import CrossbarBaseSession
from fkie_multimaster_pylib.crossbar.base_session import SelfEncoder
from fkie_multimaster_pylib.crossbar.file_interface import FileItem
from fkie_multimaster_pylib.crossbar.file_interface import RosPackage
from fkie_multimaster_pylib.crossbar.file_interface import PathItem
from fkie_multimaster_pylib.crossbar.file_interface import LogPathItem
Expand All @@ -39,122 +40,199 @@


class FileServicer(CrossbarBaseSession):

FILE_CHUNK_SIZE = 1024

def __init__(self, loop: asyncio.AbstractEventLoop, realm: str = 'ros', port: int = 11911):
def __init__(
self, loop: asyncio.AbstractEventLoop, realm: str = "ros", port: int = 11911
):
Log.info("Create ROS2 file manager servicer")
CrossbarBaseSession.__init__(self, loop, realm, port)
# TODO: clear cache after detected change or time?
self.CB_DIR_CACHE = {}

def stop(self):
'''
'''
""" """
self.shutdown()

@wamp.register('ros.packages.get_list')
@wamp.register("ros.packages.get_list")
def getPackageList(self, clear_cache: bool = False) -> List[RosPackage]:
Log.info(
f"{self.__class__.__name__}: Request to [ros.packages.get_list]")
Log.info(f"{self.__class__.__name__}: Request to [ros.packages.get_list]")
clear_cache = False
if clear_cache:
try:
from roslaunch import substitution_args
import rospkg

substitution_args._rospack = rospkg.RosPack()
except Exception as err:
Log.warn(
f"{self.__class__.__name__}: Cannot reset package cache: {err}")
f"{self.__class__.__name__}: Cannot reset package cache: {err}"
)
package_list: List[RosPackage] = []
# fill the input fields
ret = ros_pkg.get_packages(None)
for name, path in ret.items():
package = RosPackage(
name=name, path=os.path.join(path, 'share', name))
package = RosPackage(name=name, path=os.path.join(path, "share", name))
package_list.append(package)
return json.dumps(package_list, cls=SelfEncoder)

@wamp.register('ros.path.get_log_paths')
@wamp.register("ros.path.get_log_paths")
def getLogPaths(self, nodes: List[str]) -> List[LogPathItem]:
Log.info(
f"{self.__class__.__name__}: Request to [ros.path.get_log_paths] for {nodes}")
f"{self.__class__.__name__}: Request to [ros.path.get_log_paths] for {nodes}"
)
result = []
for node in nodes:
namespace = None
node_name = node

namespace_search = re.search('/(.*)/', node_name)
namespace_search = re.search("/(.*)/", node_name)
if namespace_search is not None:
namespace = f'/{namespace_search.group(1)}'
node_name = node.replace(f'/{namespace}/', '')
namespace = f"/{namespace_search.group(1)}"
node_name = node.replace(f"/{namespace}/", "")

screen_log = get_logfile(
node=node_name, for_new_screen=True, namespace=namespace)
node=node_name, for_new_screen=True, namespace=namespace
)
ros_log = get_ros_logfile(node)
log_path_item = LogPathItem(node,
screen_log=screen_log,
screen_log_exists=os.path.exists(
screen_log),
ros_log=ros_log,
ros_log_exists=os.path.exists(ros_log))
log_path_item = LogPathItem(
node,
screen_log=screen_log,
screen_log_exists=os.path.exists(screen_log),
ros_log=ros_log,
ros_log_exists=os.path.exists(ros_log),
)
result.append(log_path_item)
return json.dumps(result, cls=SelfEncoder)

@wamp.register('ros.path.get_list')
@wamp.register("ros.path.get_list")
def getPathList(self, inputPath: str) -> List[PathItem]:
Log.info(
f"{self.__class__.__name__}: Request to [ros.path.get_list] for {inputPath}")
f"{self.__class__.__name__}: Request to [ros.path.get_list] for {inputPath}"
)
path_list: List[PathItem] = []
# list the path
dirlist = os.listdir(inputPath)
for cfile in dirlist:
path = os.path.normpath('%s%s%s' % (inputPath, os.path.sep, cfile))
path = os.path.normpath("%s%s%s" % (inputPath, os.path.sep, cfile))
if os.path.isfile(path):
path_list.append(PathItem(path=path, mtime=os.path.getmtime(
path), size=os.path.getsize(path), path_type='file'))
path_list.append(
PathItem(
path=path,
mtime=os.path.getmtime(path),
size=os.path.getsize(path),
path_type="file",
)
)
elif path in self.CB_DIR_CACHE:
path_list.append(PathItem(path=path, mtime=os.path.getmtime(
path), size=os.path.getsize(path), path_type=self.CB_DIR_CACHE[path]))
path_list.append(
PathItem(
path=path,
mtime=os.path.getmtime(path),
size=os.path.getsize(path),
path_type=self.CB_DIR_CACHE[path],
)
)
elif os.path.isdir(path):
try:
fileList = os.listdir(path)
file_type = None
if ros_pkg.is_package(fileList):
file_type = 'package'
file_type = "package"
else:
file_type = 'dir'
file_type = "dir"
self.CB_DIR_CACHE[path] = file_type
path_list.append(PathItem(path=path, mtime=os.path.getmtime(
path), size=os.path.getsize(path), path_type=file_type))
path_list.append(
PathItem(
path=path,
mtime=os.path.getmtime(path),
size=os.path.getsize(path),
path_type=file_type,
)
)
except Exception as _:
pass
return json.dumps(path_list, cls=SelfEncoder)

def _glob(self, inputPath: str, recursive: bool = True, withHidden: bool = False, filter: List[str] = []) -> List[PathItem]:
def _glob(
self,
inputPath: str,
recursive: bool = True,
withHidden: bool = False,
filter: List[str] = [],
) -> List[PathItem]:
path_list: List[PathItem] = []
dir_list: List[str] = []
for name in os.listdir(inputPath):
if not withHidden and name.startswith('.'):
if not withHidden and name.startswith("."):
continue
filename = os.path.join(inputPath, name)
if os.path.isfile(filename):
path_list.append(PathItem(path=filename, mtime=os.path.getmtime(
filename), size=os.path.getsize(filename), path_type='file'))
path_list.append(
PathItem(
path=filename,
mtime=os.path.getmtime(filename),
size=os.path.getsize(filename),
path_type="file",
)
)
elif os.path.isdir(filename) and recursive:
if name not in filter:
dir_list.append(filename)
# glob the directories at the end
for filename in dir_list:
path_list.extend(self._glob(
inputPath=filename, recursive=recursive, withHidden=withHidden, filter=filter))
path_list.extend(
self._glob(
inputPath=filename,
recursive=recursive,
withHidden=withHidden,
filter=filter,
)
)
return path_list

@wamp.register('ros.path.get_list_recursive')
@wamp.register("ros.path.get_list_recursive")
def getPathListRecursive(self, inputPath: str) -> List[PathItem]:
Log.info(
f"{self.__class__.__name__}: Request to [ros.path.get_list_recursive] for {inputPath}")
f"{self.__class__.__name__}: Request to [ros.path.get_list_recursive] for {inputPath}"
)
path_list: List[PathItem] = self._glob(
inputPath, recursive=True, withHidden=False, filter=['node_modules'])
inputPath, recursive=True, withHidden=False, filter=["node_modules"]
)

return json.dumps(path_list, cls=SelfEncoder)

@wamp.register("ros.file.get")
def getFileContent(self, requestPath: str) -> FileItem:
Log.info("Request to [ros.file.get] for %s" % requestPath)
with FileIO(requestPath, "r") as outfile:
mTime = os.path.getmtime(requestPath)
fSize = os.path.getsize(requestPath)
content = outfile.readall()
encoding = "utf-8"
try:
content = content.decode(encoding)
except:
content = content.hex()
encoding = "hex"
return json.dumps(
FileItem(requestPath, mTime, fSize, content, encoding), cls=SelfEncoder
)

@wamp.register("ros.file.save")
def saveFileContent(self, request_json: FileItem) -> int:
# Covert input dictionary into a proper python object
file = json.loads(
json.dumps(request_json), object_hook=lambda d: SimpleNamespace(**d)
)
Log.info("Request to [ros.file.save] for %s" % file.path)
with FileIO(file.path, "w+") as outfile:
content = file.value
if file.encoding == "utf-8":
content = content.encode("utf-8")
elif file.encoding == "hex":
content = bytes.fromhex(content)
else:
raise TypeError(f"unknown encoding {file.encoding}")
bytesWritten = outfile.write(content)
return json.dumps(bytesWritten, cls=SelfEncoder)
Loading

0 comments on commit db5c936

Please sign in to comment.