Skip to content

Commit

Permalink
asn1_over_soup: start
Browse files Browse the repository at this point in the history
  • Loading branch information
SamDanielThangarajan committed Dec 11, 2024
1 parent 8820d8a commit 4e201df
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ classifiers = [
dependencies = [
'attrs>=23.1',
'chevron>=0.14.0',
'click>=8.1'
'click>=8.1',
'asn1>=2.6',
'asn1tools==0.164.0'
]
dynamic = ["version"]

Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
attrs>=23.1
chevron>=0.14.0
click>=8.1
asn1>=2.6
asn1tools==0.164.0
pytest>=7.4
pytest-cov>=4.1
pytest-asyncio>=0.23
Empty file.
156 changes: 156 additions & 0 deletions src/nasdaq_protocols/common/asn1_message/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import importlib.resources
from typing import Mapping, Any, Union, Tuple

import attr
import asn1tools
from nasdaq_protocols.common.utils import Printable
from nasdaq_protocols.common.utils import add_logger, kwargs_contains


@add_logger
class Asn1Spec:
SpecMap = {}
SpecProviderClass = {}

@kwargs_contains(['spec_name'])
def __init_subclass__(cls, **kwargs):
Asn1Spec.SpecProviderClass[kwargs['spec_name']] = cls

@staticmethod
def spec(spec_name):
if spec_name in Asn1Spec.SpecMap:
return Asn1Spec.SpecMap[spec_name]

if spec_name in Asn1Spec.SpecProviderClass:
spec_dir = Asn1Spec.SpecProviderClass[spec_name].spec_dir()
files = list(importlib.resources.files(spec_dir).iterdir())
Asn1Spec.SpecMap[spec_name] = Asn1Spec.asn1_compile(files)

try:
return Asn1Spec.SpecMap[spec_name]
except KeyError:
Asn1Spec.Log(f'{spec_name} is not loaded')
return None

@staticmethod
def asn1_compile(files):
return asn1tools.compiler.compile_files(files)


@attr.s(auto_attribs=True)
class Asn1Type(Printable):
TypeMap = {}
Fields = []
Hint = ''
Type = ''

_data: Union[Mapping[str, Any], Tuple[int, Any]]

def __init_subclass__(cls, **kwargs):
if 'type' in kwargs:
Asn1Type.TypeMap[kwargs['type']] = cls
cls.Type = kwargs['type']
if 'hint' in kwargs:
cls.Hint = kwargs['hint']
Asn1Type.TypeMap[cls.__name__] = cls

@classmethod
def resolve_type(cls, type_name: str):
return Asn1Type.TypeMap.get(type_name, None)

@classmethod
def hint(cls):
return cls.Hint

@classmethod
def get_type(cls):
return cls.Type

def as_collection(self):
return Asn1Type._json_compatible_collection(self._data)

def __str__(self):
return self.as_json_string(indent=4)

@staticmethod
def _json_compatible_collection(data):
if isinstance(data, tuple):
if isinstance(data[0], (bytearray, bytes)):
return f'{data[0]}'
return {data[0]: Asn1Type._json_compatible_collection(data[1])}
if isinstance(data, bytes):
return str(data)
if isinstance(data, dict):
return {k: Asn1Type._json_compatible_collection(v) for k, v in data.items()}
if isinstance(data, list):
return [Asn1Type._json_compatible_collection(_) for _ in data]
return data


@attr.s(auto_attribs=True)
class Asn1Enum(Asn1Type, context='BASIC', type='ENUMERATED', hint='str'):
pass


@attr.s(auto_attribs=True)
class Asn1Integer(Asn1Type, type='INTEGER', hint='int'):
pass


@attr.s(auto_attribs=True)
class Asn1Real(Asn1Type, type='REAL', hint='float'):
pass


@attr.s(auto_attribs=True)
class Asn1OctetString(Asn1Type, type='OCTET STRING', hint='bytes'):
pass


@attr.s(auto_attribs=True)
class Asn1BitString(Asn1Type, type='BIT STRING', hint='bytes'):
pass


@attr.s(auto_attribs=True)
class Asn1Boolean(Asn1Type, type='BOOLEAN', hint='bool'):
pass


class Asn1Sequence(Asn1Type, type='SEQUENCE', hint='dict'):
def get_attr(self, item):
if item == '_data':
return self.__dict__[item]

item = item.strip('_')
if item in self._data:
return self.__class__.Fields[item](self._data[item])
return None


class Asn1SequenceOf(Asn1Sequence, type='SEQUENCE OF', hint='dict'):
pass


class Asn1Choice(Asn1Type, type='CHOICE', hint='tuple'):
def get_attr(self, item):
if item == '_data':
return self.__dict__[item]

item = item.strip('_')

if item == self._data[0]:
return self.__class__.Fields[item](self._data[1])
return None

def get_data(self):
field_name = self._data[0]
return self.__class__.Fields[field_name](self._data[1])


class Asn1Set(Asn1Sequence, type='SET', hint='dict'):
pass


class Asn1SetOf(Asn1Sequence, type='SET OF', hint='dict'):
pass

0 comments on commit 4e201df

Please sign in to comment.