Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-43090: next-visit-fan-out for LSSTComCamSim for dev #11

Merged
merged 3 commits into from
Feb 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ async def main() -> None:
offset = os.environ["OFFSET"]
kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"]
latiss_knative_serving_url = os.environ["LATISS_KNATIVE_SERVING_URL"]
hsinfang marked this conversation as resolved.
Show resolved Hide resolved
lsst_cam_knative_serving_url = os.environ["LSST_CAM_KNATIVE_SERVING_URL"]
lsstcomcam_knative_serving_url = os.environ["LSSTCOMCAM_KNATIVE_SERVING_URL"]
Copy link
Member

@kfindeisen kfindeisen Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we already have a URL defined in the Phalanx config, but this makes me a bit nervous -- trying to support all the instruments in advance has caused us a lot of unnecessary maintenance on the Phalanx side...

Copy link
Collaborator Author

@hsinfang hsinfang Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I share your concern. Maybe we should refactor next_visit_fan_out a bit, but don't have a plan right now.

lsstcomcamsim_knative_serving_url = os.environ["LSSTCOMCAMSIM_KNATIVE_SERVING_URL"]
lsstcam_knative_serving_url = os.environ["LSSTCAM_KNATIVE_SERVING_URL"]
hsc_knative_serving_url = os.environ["HSC_KNATIVE_SERVING_URL"]

# kafka auth
Expand All @@ -176,8 +178,8 @@ async def main() -> None:

# list based on keys in config. Data class
latiss_active_detectors = detector_load(conf, "LATISS")
lsst_com_cam_active_detectors = detector_load(conf, "LSSTComCam")
lsst_cam_active_detectors = detector_load(conf, "LSSTCam")
lsstcomcam_active_detectors = detector_load(conf, "LSSTComCam")
lsstcam_active_detectors = detector_load(conf, "LSSTCam")
hsc_active_detectors = detector_load(conf, "HSC")
# These four groups are for the small dataset used in the upload.py test
hsc_active_detectors_59134 = detector_load(conf, "HSC-TEST-59134")
Expand Down Expand Up @@ -209,6 +211,10 @@ async def main() -> None:
"lsstcomcam_next_visit_messages",
"next visit nessages with lsstcomcam as instrument",
)
lsstcomcamsim_gauge = Gauge(
"lsstcomcamsim_next_visit_messages",
"next visit nessages with lsstcomcamsim as instrument",
)
hsc_gauge = Gauge(
"hsc_next_visit_messages", "next visit nessages with hsc as instrument"
)
Expand All @@ -232,6 +238,11 @@ async def main() -> None:
"lsstcomcam in process requests for next visit",
)

lsstcomcamsim_in_process_requests_gauge = Gauge(
"lsstcomcamsim_prompt_processing_in_process_requests",
"lsstcomcamsim in process requests for next visit",
)

await consumer.start()

tasks = set()
Expand Down Expand Up @@ -305,19 +316,26 @@ async def main() -> None:
)
knative_serving_url = latiss_knative_serving_url
in_process_requests_gauge = latiss_in_process_requests_gauge
# case "LSSTComCam":
# fan_out_message_list = next_visit_message.add_detectors(
# next_visit_message, lsst_com_cam_active_detectors
# )
case 3: # LSSTComCamSim
kfindeisen marked this conversation as resolved.
Show resolved Hide resolved
lsstcomcamsim_gauge.inc()
fan_out_message_list = (
next_visit_message_updated.add_detectors(
dataclasses.asdict(next_visit_message_updated),
# Just use ComCam active detector config.
lsstcomcam_active_detectors,
)
)
knative_serving_url = lsstcomcamsim_knative_serving_url
in_process_requests_gauge = lsstcomcamsim_in_process_requests_gauge
case 1: # LSSTCam
lsstcam_gauge.inc()
fan_out_message_list = (
next_visit_message_updated.add_detectors(
dataclasses.asdict(next_visit_message_updated),
lsst_cam_active_detectors,
lsstcam_active_detectors,
)
)
knative_serving_url = lsst_cam_knative_serving_url
knative_serving_url = lsstcam_knative_serving_url
in_process_requests_gauge = (
lsstcam_in_process_requests_gauge
)
Expand Down
Loading