Skip to content

Commit

Permalink
3.0.0 pubLite removed, using liteAccess instead
Browse files Browse the repository at this point in the history
  • Loading branch information
ASukhanov committed Sep 25, 2023
1 parent c6d898f commit ba7df98
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 774 deletions.
11 changes: 6 additions & 5 deletions apstrim/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Description = '''Serializer of Process Variables (from EPICS infrastructure)
or Data Objects from other infrastructures, e.g. ADO or LITE).'''
__version__ = '2.0.5 2021-08-25'
__version__ = '3.0.0 2023-09-24'# pubLITE removed, use_single_float removed

import sys, argparse
from .apstrim import apstrim, __version__
Expand All @@ -14,8 +14,8 @@ def main():
'Enable online compression')
#parser.add_argument('-D', '--dirSize', type=int, default=10240, help=\
#'Size of a directory section, set it to 0 to disable random access retrieval')
parser.add_argument('-d', '--doublePrecision', action='store_true', help=\
'Disable conversion of float64 to float32')
#parser.add_argument('-d', '--doublePrecision', action='store_true', help=\
#'Disable conversion of float64 to float32')
#parser.add_argument('-f', '--file', default=None, help=\
#'Configuration file')
parser.add_argument('-o', '--outfile', default='apstrim.aps', help=\
Expand All @@ -37,7 +37,7 @@ def main():

s = pargs.namespace.upper()
if s == 'LITE':
from .pubLITE import Access as publisher
from liteaccess import Access as publisher
elif s == 'EPICS':
from .pubEPICS import Access
publisher = Access()
Expand All @@ -58,7 +58,8 @@ def main():

apstrim.Verbosity = pargs.verbose
aps = apstrim(publisher, pvNames, pargs.sectionTime, compress=pargs.compress
, quiet=pargs.quiet, use_single_float=not pargs.doublePrecision)
#, quiet=pargs.quiet, use_single_float=not pargs.doublePrecision)
, quiet=pargs.quiet)
aps.start(pargs.outfile, howLong=pargs.acqTime)

txt = f'for {round(pargs.acqTime/60., 2)} m' if pargs.acqTime<99e6 else 'endlessly'
Expand Down
48 changes: 27 additions & 21 deletions apstrim/apstrim.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
#
# https://github.com/ASukhanov/apstrim/blob/main/LICENSE
#
__version__ = '2.0.5 2021-08-25'
__version__ = '3.0.0 2023-09-24'
#TODO: Check if all PVs are alive before logging started

import sys, time, string, copy
import os, pathlib, datetime
Expand All @@ -15,10 +16,17 @@
from timeit import default_timer as timer

import numpy as np
import msgpack
if msgpack.version < (1, 0, 2):

import msgpack as encoder
if encoder.version < (1, 0, 2):
print(f'MessagePack too old: {msgpack.version}')
sys.exit()
def encoderDump(buf):
return encoder.packb(buf, use_single_float=True)

# Try CBOR encoding
#import cbor2 as encoder
#encoderDump = encoder.dumps

#````````````````````````````Globals``````````````````````````````````````````
Nano = 0.000000001
Expand Down Expand Up @@ -121,8 +129,10 @@ class apstrim():
_eventStop = threading.Event()

def __init__(self, publisher, devPars:list, sectionInterval=60.
, compress=False, quiet=False, use_single_float=True, dirSize=10240):
#_printi(f'apstrim {__version__}, sectionInterval {sectionInterval}')
# , compress=False, quiet=False, use_single_float=True, dirSize=10240):
, compress=False, quiet=False, dirSize=10240):
_printi(f'apstrim {__version__}, sectionInterval {sectionInterval}')
print(f'v: {self.Verbosity}')
signal.signal(signal.SIGINT, _safeExit)
signal.signal(signal.SIGTERM, _safeExit)

Expand All @@ -131,15 +141,15 @@ def __init__(self, publisher, devPars:list, sectionInterval=60.
self.devPars = devPars
self.sectionInterval = sectionInterval
self.quiet = quiet
self.use_single_float = use_single_float
#self.use_single_float = use_single_float

# table of contents - related variables
self.dirSize = dirSize
self.contents_downsampling_factor = 1# No downsampling

# create a section Abstract
self.abstractSection = {'abstract':{'apstrim':__version__, 'msgpack':msgpack.version
, 'sectionInterval':sectionInterval}}
self.abstractSection = {'abstract':{'apstrim':__version__,
'encoder':encoder.__name__, 'sectionInterval':sectionInterval}}
abstract = self.abstractSection['abstract']

if compress:
Expand All @@ -164,14 +174,14 @@ def __init__(self, publisher, devPars:list, sectionInterval=60.
#for pname in self.par2Index.keys():
for pname in self.par2Index:
devPar = tuple(pname.rsplit(':',1))
_printd(f'Subscribing: {devPar}')
try:
self.publisher.subscribe(self._delivered, devPar)
except:# Exception as e:
_printe(f'Could not subscribe for {pname}')#: {e}')
continue

self.indexSection = msgpack.packb({'index':self.par2Index}
, use_single_float=self.use_single_float)
self.indexSection = encoderDump({'index':self.par2Index})

def start(self, fileName='apstrim.aps', howLong=99e6):
"""Start the streaming of the data objects to the logbook file.
Expand Down Expand Up @@ -199,14 +209,13 @@ def start(self, fileName='apstrim.aps', howLong=99e6):
# write a preliminary 'Table of contents' section
self.contentsSection = {'contents':{'size':self.dirSize}, 'data':{}}
self.dataContents = self.contentsSection['data']
self.logbook.write(msgpack.packb(self.contentsSection))
self.logbook.write(encoderDump(self.contentsSection))
# skip the 'Table of contents' zone of the logbook
self.logbook.seek(self.dirSize)

# write the sections Abstract and Index
_printd(f'write Abstract@{self.logbook.tell()}')
self.logbook.write(msgpack.packb(self.abstractSection
, use_single_float=self.use_single_float))
self.logbook.write(encoderDump(self.abstractSection))
_printd(f'write Index@{self.logbook.tell()}')
self.logbook.write(self.indexSection)
savedPos = self.logbook.tell()
Expand All @@ -229,7 +238,7 @@ def _delivered(self, *args):
"""Callback, specified in the subscribe() request.
Called when the requested data have been changed.
args is a map of delivered objects."""
#print(f'delivered: {args}')
#_printd(f'delivered: {args}')
#self.timestampedMap = {}
with self.lock:
for devPar,props in args[0].items():
Expand Down Expand Up @@ -316,8 +325,7 @@ def _serialize_sections(self):
if rf <=1 or (statistics[NSections]%rf) == 0:
self.dataContents[self.section['tstart']]\
= self.logbook.tell()
#print(f'contentsSection:{self.contentsSection}')
packed = msgpack.packb(self.contentsSection)
packed = encoderDump(self.contentsSection)
if len(packed) < self.dirSize:
self.packedContents = packed
else:
Expand All @@ -329,7 +337,7 @@ def _serialize_sections(self):
[::self.contents_downsampling_factor])
self.contentsSection['data'] = downsampled_contents
_printd(f'downsampled contentsSection:{self.contentsSection}')
self.packedContents = msgpack.packb(self.contentsSection)
self.packedContents = encoderDump(self.contentsSection)

# Update Directory section on the file.
currentPos = self.logbook.tell()
Expand Down Expand Up @@ -367,15 +375,13 @@ def _serialize_sections(self):
# msgpack
toPack = {'tstart':self.section['tstart']
,'tstart':self.section['tstart'],'pars':npPacked}
packed = msgpack.packb(toPack
, use_single_float=self.use_single_float)
packed = encoderDump(toPack)
statistics[BytesRaw] += len(packed)

# compress, takes almost no time.
if self.compress is not None:
compressed = self.compress(packed)
packed = msgpack.packb(compressed
, use_single_float=self.use_single_float)
packed = encoderDump(compressed)
statistics[BytesFinal] += len(packed)

# write to file
Expand Down
Loading

0 comments on commit ba7df98

Please sign in to comment.