-
Notifications
You must be signed in to change notification settings - Fork 0
/
dask_cluster.py
51 lines (43 loc) · 1.39 KB
/
dask_cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import os
import time
from dask_cloudprovider.aws import FargateCluster
__author__ = "jkanche"
__copyright__ = "jkanche"
__license__ = "MIT"
# try volume
# volume = {"containerPath": "/files", "sourceVolume": "dask_volume"}
cluster = FargateCluster(
vpc=os.getenv("VPC_ID"),
region_name="us-west-2",
subnets=os.getenv("SUBNETS").split(","),
security_groups=[os.getenv("SECURITY_GROUPS")],
image=os.getenv("DASK_IMAGE"),
n_workers=1,
cluster_arn=os.getenv("FARGATE_CLUSTER_ARN"),
## if you need to add volumes ##
# mount_points=[volume],
# volumes=[
# {
# "name": "dask_volume",
# "efsVolumeConfiguration": {
# "fileSystemId": os.getenv("FILE_SYSTEM_ID"),
# "transitEncryption": "ENABLED",
# },
# }
# ],
fargate_use_private_ip=True,
scheduler_address=os.getenv("SCHEDULER_ADDRESS"),
environment={
"FILES_PATH": os.getenv("FILES_PATH"),
"REDIS_DOMAIN": os.getenv("REDIS_DOMAIN"),
"REDIS_PASSWORD": os.getenv("REDIS_PASSWORD"),
},
cloudwatch_logs_group=os.getenv("WORKER_LOG_GROUP"),
worker_task_definition_arn=os.getenv("WORKER_TASK_DEF"),
)
cluster.adapt(minimum=1, maximum=int(os.getenv("DASK_MAX_WORKERS")))
print(f"Cluster Initialized {cluster}.")
# So that the deployer is always running
# in the background!
while True:
time.sleep(1)