Skip to content

Commit

Permalink
Remove old task:init code (SYN-7036) (#3639)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cisphyx authored Mar 25, 2024
1 parent 216d402 commit b950902
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 287 deletions.
157 changes: 0 additions & 157 deletions synapse/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,78 +46,6 @@ def pack(self):
}
return ret

class Genr(s_share.Share):

typename = 'genr'

async def _runShareLoop(self):

try:

for item in self.item:

if self.isfini:
break

retn = (True, item)
mesg = ('share:data', {'share': self.iden, 'data': retn})

await self.link.tx(mesg)

# purposely yield for fair scheduling
await asyncio.sleep(0)

except Exception as e:

retn = s_common.retnexc(e)
mesg = ('share:data', {'share': self.iden, 'data': retn})
await self.link.tx(mesg)

finally:

mesg = ('share:data', {'share': self.iden, 'data': None})
await self.link.tx(mesg)
await self.fini()


class AsyncGenr(s_share.Share):

typename = 'genr'

async def _runShareLoop(self):

try:

async for item in self.item:

if self.isfini:
break

retn = (True, item)
mesg = ('share:data', {'share': self.iden, 'data': retn})

await self.link.tx(mesg)

# purposely yield for fair scheduling
await asyncio.sleep(0)

except Exception as e:
retn = s_common.retnexc(e)
mesg = ('share:data', {'share': self.iden, 'data': retn})
await self.link.tx(mesg)

finally:

mesg = ('share:data', {'share': self.iden, 'data': None})
await self.link.tx(mesg)
await self.fini()

dmonwrap = (
(s_coro.GenrHelp, AsyncGenr),
(types.AsyncGeneratorType, AsyncGenr),
(types.GeneratorType, Genr),
)

async def t2call(link, meth, args, kwargs):
'''
Call the given ``meth(*args, **kwargs)`` and handle the response to provide
Expand Down Expand Up @@ -222,8 +150,6 @@ async def __anit__(self, certdir=None, ahainfo=None):

await s_base.Base.__anit__(self)

self._shareLoopTasks = set()

if certdir is None:
certdir = s_certdir.getCertDir()

Expand All @@ -242,7 +168,6 @@ async def __anit__(self, certdir=None, ahainfo=None):

self.mesgfuncs = {
'tele:syn': self._onTeleSyn,
'task:init': self._onTaskInit,
'share:fini': self._onShareFini,

# task version 2 API
Expand Down Expand Up @@ -478,29 +403,6 @@ async def sessfini():

await link.tx(reply)

async def _runTodoMeth(self, link, meth, args, kwargs):

valu = meth(*args, **kwargs)

for wraptype, wrapctor in dmonwrap:
if isinstance(valu, wraptype):
return await wrapctor.anit(link, valu)

if s_coro.iscoro(valu):
valu = await valu

return valu

def _getTaskFiniMesg(self, task, valu):

if not isinstance(valu, s_share.Share):
retn = (True, valu)
return ('task:fini', {'task': task, 'retn': retn})

retn = (True, valu.iden)
typename = valu.typename
return ('task:fini', {'task': task, 'retn': retn, 'type': typename})

async def _onTaskV2Init(self, link, mesg):

# t2:init is used by the pool sockets on the client
Expand Down Expand Up @@ -543,62 +445,3 @@ async def _onTaskV2Init(self, link, mesg):
if not link.isfini:
retn = s_common.retnexc(e)
await link.tx(('t2:fini', {'retn': retn}))

async def _onTaskInit(self, link, mesg):

task = mesg[1].get('task')
name = mesg[1].get('name')

sess = link.get('sess')
if sess is None:
raise s_exc.NoSuchObj(name=name)

item = sess.getSessItem(name)
if item is None:
raise s_exc.NoSuchObj(name=name)

try:

methname, args, kwargs = mesg[1].get('todo')

if methname[0] == '_':
raise s_exc.NoSuchMeth(name=methname)

meth = getattr(item, methname, None)
if meth is None:
logger.warning('%r has no method: %s', item, methname)
raise s_exc.NoSuchMeth(name=methname)

valu = await self._runTodoMeth(link, meth, args, kwargs)

mesg = self._getTaskFiniMesg(task, valu)

await link.tx(mesg)

# if it's a Share(), spin off the share loop
if isinstance(valu, s_share.Share):

if isinstance(item, s_base.Base):
item.onfini(valu)

async def spinshareloop():
try:
await valu._runShareLoop()
except asyncio.CancelledError:
pass
except Exception:
logger.exception('Error running %r', valu)
finally:
await valu.fini()

self.schedCoro(spinshareloop())

except (asyncio.CancelledError, Exception) as e:

logger.exception('on task:init: %r', mesg)

retn = s_common.retnexc(e)

await link.tx(
('task:fini', {'task': task, 'retn': retn})
)
78 changes: 5 additions & 73 deletions synapse/telepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,23 +325,6 @@ async def getTeleApi(self, link, mesg, path):
def onTeleShare(self, dmon, name):
pass

class Task:
'''
A telepath Task is used to internally track calls/responses.
'''
def __init__(self):
self.retn = None
self.iden = s_common.guid()
self.done = asyncio.Event()

async def result(self):
await self.done.wait()
return self.retn

def reply(self, retn):
self.retn = retn
self.done.set()

class Share(s_base.Base):
'''
The telepath client side of a dynamically shared object.
Expand Down Expand Up @@ -594,7 +577,6 @@ async def __anit__(self, link, name):
self.link = link
self.name = name

self.tasks = {}
self.shares = {}

self._ahainfo = {}
Expand All @@ -609,7 +591,6 @@ async def __anit__(self, link, name):
self.syndone = asyncio.Event()

self.handlers = {
'task:fini': self._onTaskFini,
'share:data': self._onShareData,
'share:fini': self._onShareFini,
}
Expand All @@ -619,11 +600,6 @@ async def fini():
for item in list(self.shares.values()):
await item.fini()

mesg = ('task:fini', {'retn': (False, ('IsFini', {}))})
for name, task in list(self.tasks.items()):
task.reply(mesg)
del self.tasks[name]

for link in self.links:
await link.fini()

Expand Down Expand Up @@ -801,7 +777,10 @@ async def call(self, methname, *args, **kwargs):
todo = (methname, args, kwargs)
return await self.task(todo)

async def taskv2(self, todo, name=None):
async def task(self, todo, name=None):

if self.isfini:
raise s_exc.IsFini(mesg='Telepath Proxy isfini')

mesg = ('t2:init', {
'todo': todo,
Expand Down Expand Up @@ -831,7 +810,7 @@ async def genrloop():

mesg = await link.rx()
if mesg is None:
raise s_exc.LinkShutDown(mesg=mesg)
raise s_exc.LinkShutDown(mesg='Remote peer disconnected')

if mesg[0] != 't2:yield': # pragma: no cover
info = 'Telepath protocol violation: unexpected message received'
Expand Down Expand Up @@ -860,32 +839,6 @@ async def genrloop():
await self._putPoolLink(link)
return await Share.anit(self, iden, sharinfo)

async def task(self, todo, name=None):

if self.isfini:
raise s_exc.IsFini(mesg='Telepath Proxy isfini')

if self.sess is not None:
return await self.taskv2(todo, name=name)

task = Task()

mesg = ('task:init', {
'task': task.iden,
'todo': todo,
'name': name, })

self.tasks[task.iden] = task

try:

await self.link.tx(mesg)
retn = await task.result()
return s_common.result(retn)

finally:
self.tasks.pop(task.iden, None)

async def handshake(self, auth=None):

mesg = ('tele:syn', {
Expand Down Expand Up @@ -946,27 +899,6 @@ async def _txShareExc(self, iden):
('share:fini', {'share': iden, 'isexc': True})
)

async def _onTaskFini(self, mesg):

# handle task:fini message
iden = mesg[1].get('task')

task = self.tasks.pop(iden, None)
if task is None:
logger.warning('task:fini for invalid task: %r' % (iden,))
return

retn = mesg[1].get('retn')
type = mesg[1].get('type')

if type is None:
return task.reply(retn)

ctor = sharetypes.get(type, Share)
item = await ctor.anit(self, retn[1])

return task.reply((True, item))

def __getattr__(self, name):

info = self.methinfo.get(name)
Expand Down
Loading

0 comments on commit b950902

Please sign in to comment.