diff --git a/src/enqueue.py b/src/enqueue.py index a44e6c8..70abde9 100644 --- a/src/enqueue.py +++ b/src/enqueue.py @@ -40,6 +40,9 @@ r = setup_redis() notification_secret = os.environ["NOTIFICATION_SECRET"] regexp = re.compile(os.environ.get("DATASET_REGEXP", r"fits$")) +profile = os.environ.get("PROFILE", "") +if profile != "": + profile += "@" def enqueue_objects(objects): @@ -119,7 +122,7 @@ def notify(): logger.info("Unrecognized secret %s", r["opaqueData"]) continue object_names.append( - r["s3"]["bucket"]["name"] + "/" + urllib.parse.unquote_plus(r["s3"]["object"]["key"]) + profile + r["s3"]["bucket"]["name"] + "/" + urllib.parse.unquote_plus(r["s3"]["object"]["key"]) ) info_list = enqueue_objects(object_names) update_stats(info_list) diff --git a/src/ingest.py b/src/ingest.py index 311e448..42a1d93 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -46,12 +46,6 @@ logger = setup_logging(__name__) r = setup_redis() bucket = os.environ["BUCKET"] -if (pos := bucket.find("@")) >= 0: - pos += 1 - profile = bucket[:pos] - bucket = bucket[pos:] -else: - profile = "" if bucket.startswith("rubin:"): os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" redis_queue = f"QUEUE:{bucket}" @@ -205,7 +199,7 @@ def main(): # Process any entries on the worker queue. if r.llen(worker_queue) > 0: blobs = r.lrange(worker_queue, 0, -1) - resources = [ResourcePath(f"s3://{profile}{b.decode()}") for b in blobs] + resources = [ResourcePath(f"s3://{b.decode()}") for b in blobs] # Ingest if we have resources if resources: