Skip to content

Commit

Permalink
More robust candidate broadcasting (#898)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertdstein authored May 18, 2024
1 parent 14f0ef8 commit 7d7b08f
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 30 deletions.
17 changes: 11 additions & 6 deletions mirar/catalog/kowalski/base_kowalski_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ class KowalskiError(ProcessorError):


PROTOCOL, HOST, PORT = "https", "kowalski.caltech.edu", 443
KOWALSKI_TIMEOUT = 300.0

kowalski_args = {
"protocol": PROTOCOL,
"host": HOST,
"port": PORT,
"verbose": False,
"timeout": KOWALSKI_TIMEOUT,
}


def get_kowalski() -> Kowalski:
Expand All @@ -34,9 +43,7 @@ def get_kowalski() -> Kowalski:
if token_kowalski is not None:
logger.debug("Using kowalski token")

kowalski_instance = Kowalski(
token=token_kowalski, protocol=PROTOCOL, host=HOST, port=PORT
)
kowalski_instance = Kowalski(token=token_kowalski, **kowalski_args)

else:
username_kowalski = os.getenv("KOWALSKI_USER")
Expand All @@ -61,9 +68,7 @@ def get_kowalski() -> Kowalski:
kowalski_instance = Kowalski(
username=username_kowalski,
password=password_kowalski,
protocol=PROTOCOL,
host=HOST,
port=PORT,
**kowalski_args,
)

if not kowalski_instance.ping():
Expand Down
12 changes: 9 additions & 3 deletions mirar/pipelines/winter/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@
CustomSourceTableModifier,
ForcedPhotometryDetector,
SourceBatcher,
SourceDebatcher,
SourceLoader,
SourceWriter,
ZOGYSourceDetector,
Expand Down Expand Up @@ -710,7 +709,6 @@
]

crossmatch_candidates = [
SourceDebatcher(),
XMatch(catalog=TMASS(num_sources=3, search_radius_arcmin=0.5)),
XMatch(catalog=PS1(num_sources=3, search_radius_arcmin=0.5)),
XMatch(catalog=PS1SGSc(num_sources=3, search_radius_arcmin=0.5)),
Expand Down Expand Up @@ -814,10 +812,18 @@

load_avro = [SourceLoader(input_dir_name="preavro")]

load_skyportal = [SourceLoader(input_dir_name="preskyportal")]
load_skyportal = [
SourceLoader(input_dir_name="preskyportal"),
SourceBatcher(BASE_NAME_KEY),
]

send_to_skyportal = [
SkyportalCandidateUploader(**winter_fritz_config),
HeaderEditor(edit_keys="sent", values=True),
DatabaseSourceInserter(
db_table=Candidate,
duplicate_protocol="replace",
),
]

# To make a mosaic by stacking all boards
Expand Down
13 changes: 13 additions & 0 deletions mirar/pipelines/winter/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,17 @@
"stream_id": 1005,
"update_thumbnails": True,
"skyportal_client": SkyportalClient(base_url="https://fritz.science/api/"),
"annotation_keys": [
"rb",
"chipsf",
"fwhm",
"scorr",
"nneg",
"mindtoedge",
"diffmaglim",
"distpsnr1",
"sgmag1",
"srmag1",
"simag1",
],
}
40 changes: 19 additions & 21 deletions mirar/processors/skyportal/skyportal_candidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ def __init__(
*args,
stream_id: int,
fritz_filter_id: int,
annotation_keys: list[str] | None = None,
**kwargs,
):
super().__init__(*args, **kwargs)
self.stream_id = stream_id
self.fritz_filter_id = fritz_filter_id
self.annotation_keys = annotation_keys

def skyportal_post_candidate(self, alert):
"""
Expand Down Expand Up @@ -65,29 +67,29 @@ def skyportal_post_candidate(self, alert):
)
logger.error(response.json())

def get_annotations(self, alert) -> dict:
"""
Retrieve annotations from alert data.
:param alert: Alert data
:return: Annotations
"""
data = {}

if self.annotation_keys is not None:
for key in self.annotation_keys:
if key in alert:
data[key] = alert[key]
return data

def skyportal_post_annotation(self, alert):
"""
Post an annotation. Works for both candidates and sources.
:param alert: alert data
:return: None
"""
data = {}

for key in [
"chipsf",
"fwhm",
"scorr",
"nneg",
"mindtoedge",
"diffmaglim",
"distpsnr1",
"sgmag1",
"srmag1",
"simag1",
]:
if key in alert:
data[key] = alert[key]
data = self.get_annotations(alert)

payload = {"origin": self.origin, "data": data, "group_ids": self.group_ids}

Expand Down Expand Up @@ -137,11 +139,7 @@ def skyportal_put_annotation(self, source):
# annotation from this(WNTR) origin exists
else:
# annotation data
data = {
"fwhm": source["fwhm"],
"scorr": source["scorr"],
"chipsf": source["chipsf"],
}
data = self.get_annotations(source)
new_annotation = {
"author_id": existing_annotations[self.origin]["author_id"],
"obj_id": source[SOURCE_NAME_KEY],
Expand Down
2 changes: 2 additions & 0 deletions mirar/processors/xmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class XMatch(BaseSourceProcessor):
Class to cross-match a candidate_table to a catalog
"""

max_n_cpu = 4

base_key = "XMATCH"

def __init__(
Expand Down

0 comments on commit 7d7b08f

Please sign in to comment.