Skip to content
This repository has been archived by the owner on Sep 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #39 from LuccaSA/async
Browse files Browse the repository at this point in the history
Add command to auto-decompress
  • Loading branch information
seguins authored Jun 6, 2022
2 parents 0eb67a1 + 3a2b8f3 commit 85e4edd
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 18 deletions.
33 changes: 33 additions & 0 deletions TCC.Lib/Helpers/GoogleAuthHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Google.Apis.Auth.OAuth2;
using Google.Cloud.Storage.V1;
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TCC.Lib.Helpers
{
public static class GoogleAuthHelper
{
public static async Task<GoogleCredential> GetGoogleClientAsync(string googleStorageCredential, CancellationToken token)
{
if (File.Exists(googleStorageCredential))
{
return await GoogleCredential.FromFileAsync(googleStorageCredential, token);
}
else
{
var decodedJson = Encoding.UTF8.GetString(Convert.FromBase64String(googleStorageCredential));
return GoogleCredential.FromJson(decodedJson);
}
}

public static async Task<StorageClient> GetGoogleStorageClientAsync(string googleStorageCredential, CancellationToken token)
{
var credential = await GetGoogleClientAsync(googleStorageCredential, token);
return await StorageClient.CreateAsync(credential);
}

}
}
18 changes: 2 additions & 16 deletions TCC.Lib/Storage/RemoteStorageFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using TCC.Lib.Helpers;
using TCC.Lib.Options;

namespace TCC.Lib.Storage
Expand Down Expand Up @@ -38,7 +39,7 @@ public static async Task<IRemoteStorage> GetRemoteStorageAsync(this CompressOpti
logger.LogCritical("Configuration error for google storage upload");
return new NoneRemoteStorage();
}
StorageClient storage = await GetGoogleStorageClient(option, token);
StorageClient storage = await GoogleAuthHelper.GetGoogleStorageClientAsync(option.GoogleStorageCredential, token);
return new GoogleRemoteStorage(storage, option.GoogleStorageBucketName);
}
case UploadMode.None:
Expand All @@ -48,20 +49,5 @@ public static async Task<IRemoteStorage> GetRemoteStorageAsync(this CompressOpti
throw new ArgumentOutOfRangeException();
}
}

private static async Task<StorageClient> GetGoogleStorageClient(CompressOption option, CancellationToken token)
{
GoogleCredential credential;
if (File.Exists(option.GoogleStorageCredential))
{
credential = await GoogleCredential.FromFileAsync(option.GoogleStorageCredential, token);
}
else
{
var decodedJson = Encoding.UTF8.GetString(Convert.FromBase64String(option.GoogleStorageCredential));
credential = GoogleCredential.FromJson(decodedJson);
}
return await StorageClient.CreateAsync(credential);
}
}
}
50 changes: 50 additions & 0 deletions TCC/Parser/AutoDecompressCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System.Collections.Generic;
using System.CommandLine;
using System.Threading.Tasks;
using TCC.Lib.Options;

namespace TCC.Parser
{
public class AutoDecompressOptionBinding : DecompressOption
{
public AutoDecompressOptionBinding()
{
base.Threads = 1;
}
public new string Threads { set => base.Threads = ParseCommandLineHelper.ThreadsParsing(value); }
public string Password { set => ParseCommandLineHelper.ExtractInlinePassword(this, value); }
public string PassFile { set => ParseCommandLineHelper.ExtractPasswordFile(this, value); }
public string Key { set => ParseCommandLineHelper.ExtractAsymetricFile(this, Mode.Decompress, value); }
public string GoogleStorageCredential { get; set; }
public string GoogleProjectId { get; set; }
public string GoogleSubscriptionId { get; set; }
public string TemporaryDirectory { get; set; }
}


public class AutoDecompressCommand : TccCommand<AutoDecompressOptionBinding>
{
public AutoDecompressCommand() : base("auto-decompress", "Continuous decompress from Google Cloud Storage")
{
}

protected override IEnumerable<Argument> CreateArguments()
{
yield break;
}
protected override IEnumerable<Option> CreateOptions()
{
foreach (var option in BaseCmdOptions.CreateBaseOptions())
{
yield return option;
}
yield return new Option<string>(new[] { "--googleStorageCredential" }, "Google Cloud Storage credential json, either full path or base64");
yield return new Option<string>(new[] { "--googleProjectId" }, "Google Cloud Pub/Sub, storage projectId");
yield return new Option<string>(new[] { "--googleSubscriptionId" }, "Google Cloud Pub/Sub subscriptionId");
yield return new Option<string>(new[] { "--temporaryDirectory" }, "Directory where archive are downloaded temporary");
}

protected override Task RunAsync(ITccController controller, AutoDecompressOptionBinding option)
=> controller.AutoDecompressAsync(option);
}
}
1 change: 1 addition & 0 deletions TCC/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ await BuildCommandLine()
public static CommandLineBuilder BuildCommandLine()
=> new CommandLineBuilder(new RootCommand
{
new AutoDecompressCommand(),
new CompressCommand(),
new DecompressCommand(),
new BenchmarkCommand()
Expand Down
1 change: 1 addition & 0 deletions TCC/TCC.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Cloud.PubSub.V1" Version="2.10.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageReference Include="Serilog.AspNetCore" Version="5.0.0" />
Expand Down
76 changes: 74 additions & 2 deletions TCC/TccController.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
using Microsoft.Extensions.Logging;
using Google.Apis.Auth.OAuth2;
using Google.Cloud.PubSub.V1;
using Google.Cloud.Storage.V1;
using Grpc.Auth;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using TCC.Lib;
using TCC.Lib.Benchmark;
using TCC.Lib.Database;
using TCC.Lib.Helpers;
using TCC.Lib.Notification;
using TCC.Lib.Options;
using TCC.Parser;

namespace TCC;

Expand All @@ -17,6 +25,7 @@ public interface ITccController
Task CompressAsync(CompressOption option);
Task DecompressAsync(DecompressOption option);
Task BenchmarkAsync(BenchmarkOption option);
Task AutoDecompressAsync(AutoDecompressOptionBinding option);
}

public class TccController : ITccController
Expand Down Expand Up @@ -61,6 +70,70 @@ public async Task BenchmarkAsync(BenchmarkOption option)
await LogResultAsync(operationResult, Mode.Benchmark, null);
}

public async Task AutoDecompressAsync(AutoDecompressOptionBinding option)
{
var gcpCredential = await GoogleAuthHelper.GetGoogleClientAsync(option.GoogleStorageCredential, new CancellationToken());
var subscriber = await GetGoogleClientAsync(gcpCredential, option);
var storage = await StorageClient.CreateAsync(gcpCredential);

await _databaseSetup.EnsureDatabaseExistsAsync(Mode.Decompress);

// Use the client as you'd normally do, to listen for messages in this example.
await subscriber.StartAsync(async (msg, cancellationToken) =>
{
if (!msg.Attributes.Any(kvp => kvp.Key == "eventType" && kvp.Value == "OBJECT_FINALIZE"))
{
_logger.LogDebug("EventType not found : {attributes}", msg.Attributes);
return SubscriberClient.Reply.Ack;
}
var msgData = JsonSerializer.Deserialize<ObjectStorageEvent>(msg.Data.ToStringUtf8(), new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
if (string.IsNullOrEmpty(msgData.Name))
{
_logger.LogDebug("Object name not found");
return SubscriberClient.Reply.Ack;
}
if (string.IsNullOrEmpty(msgData.Bucket))
{
_logger.LogDebug("Bucket name not found");
return SubscriberClient.Reply.Ack;
}
var fileName = Path.GetFileName(msgData.Name);
string tempPath;
if (string.IsNullOrEmpty(option.TemporaryDirectory))
{
tempPath = Path.Join(option.TemporaryDirectory, fileName);
}
else
{
tempPath = Path.GetRandomFileName();
}
using (var outputFile = File.OpenWrite(tempPath))
{
await storage.DownloadObjectAsync(msgData.Bucket, msgData.Name, outputFile);
}
_logger.LogDebug("{fileName} downloaded in {path}", fileName, tempPath);
option.SourceDirOrFile = tempPath;
var operationResult = await _tarCompressCrypt.DecompressAsync(option);
await _databaseSetup.CleanupDatabaseAsync(Mode.Decompress);
await LogResultAsync(operationResult, Mode.Decompress, option);
File.Delete(tempPath);
return SubscriberClient.Reply.Ack;
});
}
private record ObjectStorageEvent(string Bucket, string Name);
private static async Task<SubscriberClient> GetGoogleClientAsync(GoogleCredential credential, AutoDecompressOptionBinding option)
{
var subscriptionName = new SubscriptionName(option.GoogleProjectId, option.GoogleSubscriptionId);
// Create a google cloud pub/sub client that reads messages one by one
return await SubscriberClient.CreateAsync(
subscriptionName,
new SubscriberClient.ClientCreationSettings(clientCount: 1, credentials: credential.ToChannelCredentials()),
new SubscriberClient.Settings { FlowControlSettings = new Google.Api.Gax.FlowControlSettings(1, null) }
);
}

private async Task LogResultAsync(OperationSummary operationResult, Mode mode, ISlackOption slackOption)
{
Expand Down Expand Up @@ -142,5 +215,4 @@ private void WriteAuditFile(Mode mode, OperationSummary op)
}
}
}

}

0 comments on commit 85e4edd

Please sign in to comment.