Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Apply obstore as storage backend #3033

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ RUN SETUPTOOLS_SCM_PRETEND_VERSION_FOR_FLYTEKIT=$PSEUDO_VERSION \
-e /flytekit \
-e /flytekit/plugins/flytekit-deck-standard \
-e /flytekit/plugins/flytekit-flyteinteractive \
obstore==0.3.0b9 \
markdown \
pandas \
pillow \
Expand Down
194 changes: 154 additions & 40 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,69 +24,162 @@
import pathlib
import tempfile
import typing
from functools import lru_cache
from time import sleep
from typing import Any, Dict, Optional, Union, cast
from typing import Any, Dict, Optional, Tuple, Union, cast
from uuid import UUID

import fsspec
from decorator import decorator
from fsspec.asyn import AsyncFileSystem
from fsspec.utils import get_protocol
from obstore.store import AzureStore, GCSStore, S3Store
from obstore.exceptions import GenericError
from typing_extensions import Unpack

from flytekit import configuration
from flytekit.configuration import DataConfig
from flytekit.core.local_fsspec import FlyteLocalFileSystem
from flytekit.core.obstore_filesystem import ObstoreAzureBlobFileSystem, ObstoreGCSFileSystem, ObstoreS3FileSystem
from flytekit.core.utils import timeit
from flytekit.exceptions.system import FlyteDownloadDataException, FlyteUploadDataException
from flytekit.exceptions.user import FlyteAssertion, FlyteDataNotFoundException
from flytekit.interfaces.random import random
from flytekit.loggers import logger
from flytekit.utils.asyn import loop_manager

# Refer to https://github.com/fsspec/s3fs/blob/50bafe4d8766c3b2a4e1fc09669cf02fb2d71454/s3fs/core.py#L198
# Refer to https://github.com/developmentseed/obstore/blob/33654fc37f19a657689eb93327b621e9f9e01494/obstore/python/obstore/store/_aws.pyi#L11
# for key and secret
_FSSPEC_S3_KEY_ID = "key"
_FSSPEC_S3_SECRET = "secret"
_ANON = "anon"
_FSSPEC_S3_KEY_ID = "access_key_id"
_FSSPEC_S3_SECRET = "secret_access_key"
_ANON = "skip_signature"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider impact of changing auth constant

Consider if changing _ANON constant from "anon" to "skip_signature" might affect existing code that relies on this value. This appears to be a breaking change in the S3 authentication configuration.

Code suggestion
Check the AI-generated fix before applying
Suggested change
_ANON = "skip_signature"
# TODO: Deprecate "anon" in future versions
_ANON = "anon" # or support both: _ANON = ("anon", "skip_signature")

Code Review Run #ffca15


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


Uploadable = typing.Union[str, os.PathLike, pathlib.Path, bytes, io.BufferedReader, io.BytesIO, io.StringIO]


def s3_setup_args(s3_cfg: configuration.S3Config, anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {
"cache_regions": True,
}
@lru_cache
def s3store_from_env(bucket: str, retries: int, **store_kwargs) -> S3Store:
store = S3Store.from_env(
bucket,
config={
**store_kwargs,
"aws_allow_http": "true", # Allow HTTP connections
"aws_virtual_hosted_style_request": "false", # Use path-style addressing
},
)
return store


@lru_cache
def gcsstore_from_env(bucket: str) -> GCSStore:
store = GCSStore.from_env(bucket)
return store
Comment on lines +74 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider preserving anonymous access functionality

Consider passing the anonymous parameter to gcsstore_from_env() since it was previously used to set the token to _ANON when anonymous=True

Code suggestion
Check the AI-generated fix before applying
Suggested change
def gcsstore_from_env(bucket: str) -> GCSStore:
store = GCSStore.from_env(bucket)
return store
def gcsstore_from_env(bucket: str, anonymous: bool = False) -> GCSStore:
store = GCSStore.from_env(bucket)
if anonymous:
store.token = _ANON
return store

Code Review Run #2ba550


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged



@lru_cache
def azurestore_from_env(container: str, **store_kwargs) -> AzureStore:
store = AzureStore.from_env(
container,
config=store_kwargs,
)
return store
Comment on lines +61 to +85
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider consolidating store creation functions

Consider consolidating the store creation functions into a single factory function to reduce code duplication. The current implementation has similar patterns repeated across s3store_from_env, gcsstore_from_env, and azurestore_from_env.

Code suggestion
Check the AI-generated fix before applying
 -@lru_cache
 -def s3store_from_env(bucket: str, retries: int, **store_kwargs) -> S3Store:
 -    store = S3Store.from_env(
 -        bucket,
 -        config={
 -            **store_kwargs,
 -            "aws_allow_http": "true",
 -            "aws_virtual_hosted_style_request": "false",
 -        },
 -    )
 -    return store
 -
 -@lru_cache
 -def gcsstore_from_env(bucket: str) -> GCSStore:
 -    store = GCSStore.from_env(bucket)
 -    return store
 -
 -@lru_cache
 -def azurestore_from_env(container: str, **store_kwargs) -> AzureStore:
 -    store = AzureStore.from_env(
 -        container,
 -        config=store_kwargs,
 -    )
 -    return store
 +@lru_cache
 +def create_store(store_type: str, container: str, **kwargs) -> Union[S3Store, GCSStore, AzureStore]:
 +    if store_type == "s3":
 +        return S3Store.from_env(container, config={**kwargs, "aws_allow_http": "true", "aws_virtual_hosted_style_request": "false"})
 +    elif store_type == "gcs":
 +        return GCSStore.from_env(container)
 +    elif store_type == "azure":
 +        return AzureStore.from_env(container, config=kwargs)
 +    raise ValueError(f"Unsupported store type: {store_type}")

Code Review Run #2ba550


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged



def s3_setup_args(s3_cfg: configuration.S3Config, bucket: str = "", anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}
store_kwargs: Dict[str, Any] = {}

if s3_cfg.access_key_id:
kwargs[_FSSPEC_S3_KEY_ID] = s3_cfg.access_key_id
store_kwargs[_FSSPEC_S3_KEY_ID] = s3_cfg.access_key_id

if s3_cfg.secret_access_key:
kwargs[_FSSPEC_S3_SECRET] = s3_cfg.secret_access_key
store_kwargs[_FSSPEC_S3_SECRET] = s3_cfg.secret_access_key

# S3fs takes this as a special arg
if s3_cfg.endpoint is not None:
kwargs["client_kwargs"] = {"endpoint_url": s3_cfg.endpoint}

store_kwargs["endpoint_url"] = s3_cfg.endpoint
if anonymous:
kwargs[_ANON] = True
store_kwargs[_ANON] = "true"

store = s3store_from_env(bucket, s3_cfg.retries, **store_kwargs)

kwargs["retries"] = s3_cfg.retries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating S3 retries value

Consider validating the retries value before assigning it to kwargs. A negative or extremely large value could cause issues.

Code suggestion
Check the AI-generated fix before applying
Suggested change
kwargs["retries"] = s3_cfg.retries
if s3_cfg.retries is not None and 0 <= s3_cfg.retries <= 10:
kwargs["retries"] = s3_cfg.retries

Code Review Run #e101cd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


kwargs["store"] = store

return kwargs


def azure_setup_args(azure_cfg: configuration.AzureBlobStorageConfig, anonymous: bool = False) -> Dict[str, Any]:
def gs_setup_args(gcs_cfg: configuration.GCSConfig, bucket: str = "", anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}

store = gcsstore_from_env(bucket)

kwargs["store"] = store

return kwargs


def split_path(path: str) -> Tuple[str, str]:
"""
Split bucket and file path

Parameters
----------
path : string
Input path, like `s3://mybucket/path/to/file`

Examples
--------
>>> split_path("s3://mybucket/path/to/file")
['mybucket', 'path/to/file']
"""
support_types = ["s3", "gs", "abfs"]
protocol = get_protocol(path)
if protocol not in support_types:
Comment on lines +137 to +139
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving support types to constant

Consider moving the support_types list to a module-level constant since it represents static configuration data. This would improve maintainability and reusability.

Code suggestion
Check the AI-generated fix before applying
 @@ -1,1 +1,3 @@
 +SUPPORTED_PROTOCOLS = ["s3", "gs", "abfs"]
 +
  def split_path(path: str) -> Tuple[str, str]:
 -    support_types = ["s3", "gs", "abfs"]
 -    protocol = get_protocol(path)
 -    if protocol not in support_types:
 +    protocol = get_protocol(path)
 +    if protocol not in SUPPORTED_PROTOCOLS:

Code Review Run #e101cd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

# no bucket for file
return "", path

if path.startswith(protocol + "://"):
path = path[len(protocol) + 3 :]
elif path.startswith(protocol + "::"):
path = path[len(protocol) + 2 :]
path = path.strip("/")

if "/" not in path:
return path, ""
else:
path_li = path.split("/")
bucket = path_li[0]
# use obstore for s3 and gcs only now, no need to split
# bucket out of path for other storage
file_path = "/".join(path_li[1:])
return (bucket, file_path)


def azure_setup_args(
azure_cfg: configuration.AzureBlobStorageConfig, container: str = "", anonymous: bool = False
) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}
store_kwargs: Dict[str, Any] = {}

if azure_cfg.account_name:
kwargs["account_name"] = azure_cfg.account_name
store_kwargs["account_name"] = azure_cfg.account_name
if azure_cfg.account_key:
kwargs["account_key"] = azure_cfg.account_key
store_kwargs["account_key"] = azure_cfg.account_key
if azure_cfg.client_id:
kwargs["client_id"] = azure_cfg.client_id
store_kwargs["client_id"] = azure_cfg.client_id
if azure_cfg.client_secret:
kwargs["client_secret"] = azure_cfg.client_secret
store_kwargs["client_secret"] = azure_cfg.client_secret
if azure_cfg.tenant_id:
kwargs["tenant_id"] = azure_cfg.tenant_id
kwargs[_ANON] = anonymous
store_kwargs["tenant_id"] = azure_cfg.tenant_id
if anonymous:
kwargs[_ANON] = "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using boolean instead of string

Consider using a boolean value directly instead of string 'true' for the _ANON parameter to maintain type consistency. Many systems interpret string 'true' differently than boolean True.

Code suggestion
Check the AI-generated fix before applying
Suggested change
kwargs[_ANON] = "true"
kwargs[_ANON] = True

Code Review Run #ffca15


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


store = azurestore_from_env(container, **store_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider explicit parameters over kwargs unpacking

Consider using direct function call parameters instead of unpacking store_kwargs to improve code readability and maintainability. The function call could be more explicit about what parameters are being passed.

Code suggestion
Check the AI-generated fix before applying
 -    store_kwargs: Dict[str, Any] = {}
 -    if azure_cfg.account_name:
 -        store_kwargs["account_name"] = azure_cfg.account_name
 -    if azure_cfg.account_key:
 -        store_kwargs["account_key"] = azure_cfg.account_key
 -    if azure_cfg.client_id:
 -        store_kwargs["client_id"] = azure_cfg.client_id
 -    if azure_cfg.client_secret:
 -        store_kwargs["client_secret"] = azure_cfg.client_secret
 -    if azure_cfg.tenant_id:
 -        store_kwargs["tenant_id"] = azure_cfg.tenant_id
 -    store = azurestore_from_env(container, **store_kwargs)
 +    store = azurestore_from_env(
 +        container,
 +        account_name=azure_cfg.account_name,
 +        account_key=azure_cfg.account_key,
 +        client_id=azure_cfg.client_id,
 +        client_secret=azure_cfg.client_secret,
 +        tenant_id=azure_cfg.tenant_id
 +    )

Code Review Run #2ba550


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


kwargs["store"] = store

return kwargs


Expand All @@ -100,8 +193,6 @@ def get_fsspec_storage_options(
if protocol == "s3":
return {**s3_setup_args(data_config.s3, anonymous=anonymous), **kwargs}
if protocol == "gs":
if anonymous:
kwargs["token"] = _ANON
return kwargs
if protocol in ("abfs", "abfss"):
return {**azure_setup_args(data_config.azure, anonymous=anonymous), **kwargs}
Expand Down Expand Up @@ -189,21 +280,27 @@ def get_filesystem(
protocol: typing.Optional[str] = None,
anonymous: bool = False,
path: typing.Optional[str] = None,
bucket: str = "",
**kwargs,
) -> fsspec.AbstractFileSystem:
# TODO: add bucket to adlfs
if not protocol:
return self._default_remote
if protocol == "file":
kwargs["auto_mkdir"] = True
return FlyteLocalFileSystem(**kwargs)
elif protocol == "s3":
s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
s3kwargs = s3_setup_args(self._data_config.s3, bucket, anonymous=anonymous)
s3kwargs.update(kwargs)
return fsspec.filesystem(protocol, **s3kwargs) # type: ignore
elif protocol == "gs":
if anonymous:
kwargs["token"] = _ANON
return fsspec.filesystem(protocol, **kwargs) # type: ignore
gskwargs = gs_setup_args(self._data_config.gcs, bucket, anonymous=anonymous)
gskwargs.update(kwargs)
return fsspec.filesystem(protocol, **gskwargs) # type: ignore
elif protocol == "abfs":
azkwargs = azure_setup_args(self._data_config.azure, bucket, anonymous=anonymous)
azkwargs.update(kwargs)
return fsspec.filesystem(protocol, **azkwargs) # type: ignore
elif protocol == "ftp":
kwargs.update(fsspec.implementations.ftp.FTPFileSystem._get_kwargs_from_urls(path))
return fsspec.filesystem(protocol, **kwargs)
Expand All @@ -216,16 +313,20 @@ def get_filesystem(
return fsspec.filesystem(protocol, **kwargs)

async def get_async_filesystem_for_path(
self, path: str = "", anonymous: bool = False, **kwargs
self, path: str = "", bucket: str = "", anonymous: bool = False, **kwargs
) -> Union[AsyncFileSystem, fsspec.AbstractFileSystem]:
protocol = get_protocol(path)
loop = asyncio.get_running_loop()

return self.get_filesystem(protocol, anonymous=anonymous, path=path, asynchronous=True, loop=loop, **kwargs)
return self.get_filesystem(
protocol, anonymous=anonymous, path=path, bucket=bucket, asynchronous=True, loop=loop, **kwargs
)

def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem:
def get_filesystem_for_path(
self, path: str = "", bucket: str = "", anonymous: bool = False, **kwargs
) -> fsspec.AbstractFileSystem:
protocol = get_protocol(path)
return self.get_filesystem(protocol, anonymous=anonymous, path=path, **kwargs)
return self.get_filesystem(protocol, anonymous=anonymous, path=path, bucket=bucket, **kwargs)

@staticmethod
def is_remote(path: Union[str, os.PathLike]) -> bool:
Expand Down Expand Up @@ -295,7 +396,8 @@ def exists(self, path: str) -> bool:

@retry_request
async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
file_system = await self.get_async_filesystem_for_path(from_path)
bucket, from_path_file_only = split_path(from_path)
file_system = await self.get_async_filesystem_for_path(from_path, bucket)
Comment on lines +399 to +400
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle empty bucket case for storage

Consider handling the case where split_path() returns empty bucket for non-file protocols. Currently passing empty bucket to get_async_filesystem_for_path() could cause issues with cloud storage access.

Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, from_path_file_only = split_path(from_path)
file_system = await self.get_async_filesystem_for_path(from_path, bucket)
bucket, from_path_file_only = split_path(from_path)
protocol = get_protocol(from_path)
if protocol not in ['file'] and not bucket:
raise ValueError(f'Empty bucket not allowed for protocol {protocol}')
file_system = await self.get_async_filesystem_for_path(from_path, bucket)

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

if recursive:
from_path, to_path = self.recursive_paths(from_path, to_path)
try:
Expand All @@ -307,16 +409,21 @@ async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwa
)
logger.info(f"Getting {from_path} to {to_path}")
if isinstance(file_system, AsyncFileSystem):
dst = await file_system._get(from_path, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212
dst = await file_system._get(from_path_file_only, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212
else:
dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs)
if isinstance(dst, (str, pathlib.Path)):
return dst
return to_path
except OSError as oe:
except (OSError, GenericError) as oe:
logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}")
if isinstance(file_system, AsyncFileSystem):
exists = await file_system._exists(from_path) # pylint: disable=W0212
try:
exists = await file_system._exists(from_path) # pylint: disable=W0212
except GenericError:
# for obstore, as it does not raise FileNotFoundError in fsspec but GenericError
# force it to try get_filesystem(anonymous=True)
exists = True
else:
exists = file_system.exists(from_path)
if not exists:
Expand All @@ -336,7 +443,8 @@ async def _put(self, from_path: str, to_path: str, recursive: bool = False, **kw
More of an internal function to be called by put_data and put_raw_data
This does not need a separate sync function.
"""
file_system = await self.get_async_filesystem_for_path(to_path)
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
Comment on lines +446 to +447
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating bucket before filesystem call

Consider validating the bucket parameter before passing it to get_async_filesystem_for_path(). An empty bucket could cause issues with certain storage backends. Similar issues were also found in:

  • flytekit/core/data_persistence.py (line 318)
  • flytekit/core/data_persistence.py (line 521)
  • flytekit/core/data_persistence.py (line 308)
Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
bucket, to_path_file_only = split_path(to_path)
protocol = get_protocol(to_path)
if protocol in ['s3', 'gs', 'abfs'] and not bucket:
raise ValueError(f'Bucket cannot be empty for {protocol} protocol')
file_system = await self.get_async_filesystem_for_path(to_path, bucket)

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +446 to +447
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting path splitting logic

Consider extracting the bucket and path splitting logic into a separate method to improve code reusability and maintainability. The split_path function is used in multiple places and could be encapsulated better.

Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
bucket, path = self._split_and_get_bucket_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

from_path = self.strip_file_header(from_path)
if recursive:
# Only check this for the local filesystem
Expand All @@ -354,7 +462,7 @@ async def _put(self, from_path: str, to_path: str, recursive: bool = False, **kw
kwargs["metadata"] = {}
kwargs["metadata"].update(self._execution_metadata)
if isinstance(file_system, AsyncFileSystem):
dst = await file_system._put(from_path, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212
dst = await file_system._put(from_path, to_path_file_only, recursive=recursive, **kwargs) # pylint: disable=W0212
else:
dst = file_system.put(from_path, to_path, recursive=recursive, **kwargs)
if isinstance(dst, (str, pathlib.Path)):
Expand Down Expand Up @@ -423,11 +531,13 @@ async def async_put_raw_data(
r = await self._put(from_path, to_path, **kwargs)
return r or to_path

bucket, _ = split_path(to_path)

# See https://github.com/fsspec/s3fs/issues/871 for more background and pending work on the fsspec side to
# support effectively async open(). For now these use-cases below will revert to sync calls.
# raw bytes
if isinstance(lpath, bytes):
fs = self.get_filesystem_for_path(to_path)
fs = self.get_filesystem_for_path(to_path, bucket)
with fs.open(to_path, "wb", **kwargs) as s:
s.write(lpath)
return to_path
Expand All @@ -436,7 +546,7 @@ async def async_put_raw_data(
if isinstance(lpath, io.BufferedReader) or isinstance(lpath, io.BytesIO):
if not lpath.readable():
raise FlyteAssertion("Buffered reader must be readable")
fs = self.get_filesystem_for_path(to_path)
fs = self.get_filesystem_for_path(to_path, bucket)
lpath.seek(0)
with fs.open(to_path, "wb", **kwargs) as s:
while data := lpath.read(read_chunk_size_bytes):
Expand All @@ -446,7 +556,7 @@ async def async_put_raw_data(
if isinstance(lpath, io.StringIO):
if not lpath.readable():
raise FlyteAssertion("Buffered reader must be readable")
fs = self.get_filesystem_for_path(to_path)
fs = self.get_filesystem_for_path(to_path, bucket)
lpath.seek(0)
with fs.open(to_path, "wb", **kwargs) as s:
while data_str := lpath.read(read_chunk_size_bytes):
Expand Down Expand Up @@ -635,6 +745,10 @@ async def async_put_data(
put_data = loop_manager.synced(async_put_data)


fsspec.register_implementation("s3", ObstoreS3FileSystem)
fsspec.register_implementation("gs", ObstoreGCSFileSystem)
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem)

flyte_tmp_dir = tempfile.mkdtemp(prefix="flyte-")
default_local_file_access_provider = FileAccessProvider(
local_sandbox_dir=os.path.join(flyte_tmp_dir, "sandbox"),
Expand Down
Loading
Loading