Skip to content
代码片段 群组 项目
未验证 提交 75cf1cb5 编辑于 作者: Mohamed Awnallah's avatar Mohamed Awnallah 提交者: GitHub
浏览文件

sdks/python: enable recursive deletion for GCSFileSystem Paths (#33611)

* sdks/python: enable recursive deletion for GCS

In this commit, we enable recursive deletion for
GCS (Google Cloud Storage) paths, including directories
and blobs.

Changes include:
- Updated the `delete` method to support recursive deletion of GCS
  directories (prefixes).
- If the path points to a directory, all blobs under that prefix are
  deleted.
- Refactored logic to handle both single blob and directory deletion
  cases.

* sdks/python: update delete test case for GCS

In this commit, we update the delete test to verify
recursive deletion of directories (prefixes) in GCS.

Changes include:
- Added test for deleting a GCS directory (prefix) with multiple files.
- Verified files under a directory are deleted recursively when using the delete method.

* CHANGES.md: update CHANGES for `2.63.0`

* CHANGES.md: capitalize `enable` word
上级 fc43c12b
No related branches found
No related tags found
无相关合并请求
......@@ -73,6 +73,7 @@
* Add BigQuery vector/embedding ingestion and enrichment components to apache_beam.ml.rag (Python) ([#33413](https://github.com/apache/beam/pull/33413)).
* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)).
* [GCSIO] Added retry logic to each batch method of the GCS IO (Python) ([#33539](https://github.com/apache/beam/pull/33539))
* [GCSIO] Enable recursive deletion for GCSFileSystem Paths (Python) ([#33611](https://github.com/apache/beam/pull/33611)).
* External, Process based Worker Pool support added to the Go SDK container. ([#33572](https://github.com/apache/beam/pull/33572))
* This is used to enable sidecar containers to run SDK workers for some runners.
* See https://beam.apache.org/documentation/runtime/sdk-harness-config/ for details.
......
......@@ -354,7 +354,8 @@ class GCSFileSystem(FileSystem):
for path in paths:
if path.endswith('/'):
path_to_use = path + '*'
self._gcsIO().delete(path, recursive=True)
continue
else:
path_to_use = path
match_result = self.match([path_to_use])[0]
......
......@@ -244,16 +244,37 @@ class GcsIO(object):
else:
raise ValueError('Invalid file open mode: %s.' % mode)
def delete(self, path):
def delete(self, path, recursive=False):
"""Deletes the object at the given GCS path.
If the path is a directory (prefix), it deletes all blobs under that prefix
when recursive=True.
Args:
path: GCS file path pattern in the form gs://<bucket>/<name>.
recursive (bool, optional): If True, deletes all objects under the prefix
when the path is a directory (default: False).
"""
bucket_name, blob_name = parse_gcs_path(path)
bucket = self.client.bucket(bucket_name)
if recursive:
# List and delete all blobs under the prefix.
blobs = bucket.list_blobs(prefix=blob_name)
for blob in blobs:
self._delete_blob(bucket, blob.name)
else:
# Delete only the specific blob.
self._delete_blob(bucket, blob_name)
def _delete_blob(self, bucket, blob_name):
"""Helper method to delete a single blob from GCS.
Args:
bucket: The GCS bucket object.
blob_name: The name of the blob to delete under the bucket.
"""
if self._use_blob_generation:
# blob can be None if not found
# Fetch blob generation if required.
blob = bucket.get_blob(blob_name, retry=self._storage_client_retry)
generation = getattr(blob, "generation", None)
else:
......
......@@ -153,6 +153,10 @@ class FakeBucket(object):
if name in bucket.blobs:
del bucket.blobs[name]
def list_blobs(self, prefix=None, **kwargs):
bucket = self._get_canonical_bucket()
return self.client.list_blobs(bucket, prefix, **kwargs)
class FakeBlob(object):
def __init__(
......@@ -445,20 +449,43 @@ class TestGCSIO(unittest.TestCase):
self.gcs.open(file_name, 'r+b')
def test_delete(self):
# File path.
file_name = 'gs://gcsio-test/delete_me'
file_size = 1024
bucket_name, blob_name = gcsio.parse_gcs_path(file_name)
# Test deletion of non-existent file.
bucket = self.client.get_bucket(bucket_name)
self.gcs.delete(file_name)
# Insert a random file for testing.
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(blob_name in bucket.blobs)
# Deleting the file.
self.gcs.delete(file_name)
self.assertFalse(blob_name in bucket.blobs)
# Now test deleting a directory (prefix) with multiple files.
prefix = 'gs://gcsio-test/directory_to_delete/'
file_names = [f"{prefix}file1", f"{prefix}file2", f"{prefix}file3"]
blobs = [gcsio.parse_gcs_path(file_name) for file_name in file_names]
# Insert random files under the prefix.
for file_name in file_names:
self._insert_random_file(self.client, file_name, file_size)
# Verify the files exist before deletion
for blob in blobs:
self.assertTrue(blob[1] in bucket.blobs)
# Deleting the directory (all files under the prefix).
self.gcs.delete(prefix, recursive=True)
# Verify that the files are deleted.
for blob in blobs:
self.assertFalse(blob[1] in bucket.blobs)
def test_copy(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册