Skip to content

Commit

Permalink
Merge pull request #3 from Kentico/feat/9466233
Browse files Browse the repository at this point in the history
  • Loading branch information
seangwright authored Jul 25, 2023
2 parents 02fcf60 + d9cca42 commit 0a63e65
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 80 deletions.
29 changes: 25 additions & 4 deletions .azuredevops/pipelines/build-and-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,31 @@ stages:
- name: Configuration
value: Release

- name: ProjectPath
- name: ProjectFolder
value: src/Kentico.Xperience.Contacts.Importer

- name: ProjectFilePath
value: src/Kentico.Xperience.Contacts.Importer/Kentico.Xperience.Contacts.Importer.csproj

steps:
- task: PowerShell@2
displayName: Set Node.js version from package.json
inputs:
targetType: inline
script: |
$fileContent = Get-Content -Path './${{ variables.ProjectFolder }}/Client/package.json' -Raw
$jsonObject = ConvertFrom-Json -InputObject $fileContent
# Get the value of engines.node
$enginesNode = $jsonObject.engines.node
Write-Host "Required Node version $enginesNode"
Write-Host "##vso[task.setvariable variable=PACKAGE_JSON_NODE_VERSION]$enginesNode"
- task: UseNode@1
displayName: "Install Node.js from package.json version"
inputs:
version: $(PACKAGE_JSON_NODE_VERSION)

- task: UseDotNet@2
displayName: Select dotnet version
inputs:
Expand All @@ -53,23 +74,23 @@ stages:
displayName: Restore dependencies
inputs:
command: restore
projects: ${{ variables.ProjectPath }}
projects: ${{ variables.ProjectFilePath }}
feedsToUse: select
restoreArguments: --locked-mode

- task: DotNetCoreCLI@2
displayName: Build
inputs:
command: build
projects: ${{ variables.ProjectPath }}
projects: ${{ variables.ProjectFilePath }}
configuration: ${{ variables.Configuration }}
arguments: --no-restore

- task: DotNetCoreCLI@2
displayName: Create NuGet package
inputs:
command: pack
packagesToPack: ${{ variables.ProjectPath }}
packagesToPack: ${{ variables.ProjectFilePath }}
configuration: ${{ variables.Configuration }}
packDirectory: $(System.DefaultWorkingDirectory)/packages
includesymbols: true
Expand Down
File renamed without changes
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async Task SendProgressReport(string message)
{
if (webSocket.State == WebSocketState.Open)
{
var payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "msg", payload = $"{message}" }));
byte[] payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "msg", payload = $"{message}" }));
await webSocket.SendAsync(new ArraySegment<byte>(payload, 0, payload.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
Expand All @@ -66,7 +66,7 @@ async Task SendTooFastReport()
{
if (webSocket.State == WebSocketState.Open)
{
var payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "toofast", payload = $"" }));
byte[] payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "toofast", payload = $"" }));
await webSocket.SendAsync(new ArraySegment<byte>(payload, 0, payload.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
Expand All @@ -75,7 +75,7 @@ async Task SendProgressFinished()
{
if (webSocket.State == WebSocketState.Open)
{
var payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "finished", payload = $"" }));
byte[] payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "finished", payload = $"" }));
await webSocket.SendAsync(new ArraySegment<byte>(payload, 0, payload.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
Expand All @@ -84,7 +84,7 @@ async Task SendConfirmHeader()
{
if (webSocket.State == WebSocketState.Open)
{
var msg = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "headerConfirmed", payload = "" }));
byte[] msg = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "headerConfirmed", payload = "" }));
await webSocket.SendAsync(new ArraySegment<byte>(msg, 0, msg.Length), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
Expand All @@ -99,7 +99,7 @@ async Task SendConfirmHeader()

await SendConfirmHeader();

var consumerIsRunning = true;
bool consumerIsRunning = true;
var ms = new AsynchronousStream(1024 * 32 * 500);

var consumerTask = Task.Run(async () =>
Expand All @@ -113,13 +113,7 @@ await importService.RunImport(ms, context, async (result, totalProcessed) =>
async exception => { await SendProgressReport($"{exception}"); }
);
await SendProgressReport($"...finished");
// await SendProgressFinished();
}
// catch (Exception ex)
// {
// logService.LogException(SOURCE, "CONSUMER", ex);
// await SendProgressReport($"Consumer error: {ex}");
// }
finally
{
consumerIsRunning = false;
Expand All @@ -129,9 +123,7 @@ await importService.RunImport(ms, context, async (result, totalProcessed) =>
var producerTask = Task.Run(async () =>
{
WebSocketReceiveResult? receiveResult = null;
// try
// {
var bufferSize = 1024 * 32;
int bufferSize = 1024 * 32;
while (true)
{
Expand All @@ -140,7 +132,7 @@ await importService.RunImport(ms, context, async (result, totalProcessed) =>
break;
}
var buffer = new byte[bufferSize];
byte[] buffer = new byte[bufferSize];
receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (!receiveResult.CloseStatus.HasValue)
Expand All @@ -153,8 +145,8 @@ await importService.RunImport(ms, context, async (result, totalProcessed) =>
ms.Flush();
}
var count = receiveResult.Count;
var response = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "progress", payload = count }));
int count = receiveResult.Count;
byte[] response = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { type = "progress", payload = count }));
await webSocket.SendAsync(new ArraySegment<byte>(response, 0, response.Length), WebSocketMessageType.Text, true, CancellationToken.None);
if (ms.CachedBlocks > 3500)
Expand All @@ -169,28 +161,9 @@ await importService.RunImport(ms, context, async (result, totalProcessed) =>
break;
}
}
// }
// catch (Exception ex)
// {
// logService.LogException(SOURCE, "PRODUCER", ex);
// await SendProgressReport($"Producer error: {ex}");
// await SendProgressFinished();
// }
// finally
// {
//
// // await consumerTask;
// // if (receiveResult != null)
// // {
// // await webSocket.CloseAsync(
// // WebSocketCloseStatus.NormalClosure,
// // receiveResult.CloseStatusDescription,
// // CancellationToken.None);
// // }
// }
});

var socketAvailable = true;
bool socketAvailable = true;

try
{
Expand Down Expand Up @@ -225,9 +198,6 @@ await importService.RunImport(ms, context, async (result, totalProcessed) =>
logService.LogException(SOURCE, "CONSUMER", e);
await SendProgressReport($"{e}");
}
// finally
// {
// }

if (socketAvailable)
{
Expand All @@ -247,7 +217,7 @@ await webSocket.CloseAsync(

while (true)
{
var buffer = new byte[bufferSize];
byte[] buffer = new byte[bufferSize];
var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

if (!receiveResult.CloseStatus.HasValue)
Expand All @@ -274,7 +244,7 @@ await webSocket.CloseAsync(
ms.Seek(0, SeekOrigin.Begin);

using var sr = new StreamReader(ms);
var msg = await sr.ReadToEndAsync();
string msg = await sr.ReadToEndAsync();
var deserialized = JObject.Parse(msg);
return deserialized;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ public ContactInfoMap()

private sealed class ContactDeleteArgument
{
// Pragma disable reason: used implicitly
#pragma warning disable S3459
// ReSharper disable once InconsistentNaming // kentico naming convention
public Guid ContactGUID { get; set; }
#pragma warning restore S3459
};

private sealed class SimplifiedMap : ClassMap<ContactDeleteArgument>
Expand Down Expand Up @@ -74,7 +78,7 @@ private async Task BulkDeleteContactFromCsvAsync(Stream csvStream, ImportContext
{
var config = new CsvConfiguration(CultureInfo.InvariantCulture)
{
Delimiter = context.Delimiter, //";" // TODO tomas.krch: 2023-07-12 to config/dialog
Delimiter = context.Delimiter,
PrepareHeaderForMatch = args => args.Header.ToLower(),
};

Expand All @@ -83,23 +87,18 @@ private async Task BulkDeleteContactFromCsvAsync(Stream csvStream, ImportContext

csv.Context.RegisterClassMap<SimplifiedMap>();

var records = csv.GetRecords<ContactDeleteArgument>();
var records = csv.GetRecordsAsync<ContactDeleteArgument>();

int totalProcessed = 0;

async IAsyncEnumerable<List<Guid>> Pipe2TransformBatches(IEnumerable<ContactDeleteArgument> models)
async IAsyncEnumerable<List<Guid>> Pipe2TransformBatches(IAsyncEnumerable<ContactDeleteArgument> models)
{
var currentBatch = new List<Guid>(context.BatchSize);

foreach (var item in models)
await foreach (var item in models)
{
try
{
// if (contactGuids.Contains(item.ContactGUID))
// {
// continue;
// }

currentBatch.Add(item.ContactGUID);
}
catch (Exception ex)
Expand Down Expand Up @@ -174,7 +173,7 @@ private async Task InsertContactsFromCsvAsync(Stream csvStream, ImportContext co
csv.Context.RegisterClassMap<ContactInfoMap>();

var records = csv.GetRecords<ContactInfo>();
var totalProcessed = 0;
int totalProcessed = 0;

IEnumerable<List<(ContactInfo info, bool insert)>> Pipe2TransformBatches(IEnumerable<ContactInfo> models)
{
Expand Down Expand Up @@ -216,14 +215,8 @@ private async Task InsertContactsFromCsvAsync(Stream csvStream, ImportContext co
LogEvents = false,
})
{
// insert is not immediate (all items stored in memory before insert) so direct piping is not possible
// ContactInfoProvider.ProviderObject.BulkInsertInfos(Pipe2Transform(records), new BulkInsertSettings
// {
// BatchSize = context.BatchSize,
// Options = SqlBulkCopyOptions.Default,
// });

// Task? previousInsertGroupMemberBatch = null;
// we cannot use ContactInfoProvider.ProviderObject.BulkInsertInfos - insert is not immediate (all items stored in memory before insert) so direct piping is not possible

foreach (var contactBatch in Pipe2TransformBatches(records))
{
ContactInfoProvider.ProviderObject.BulkInsertInfos(contactBatch.Where(x => x.insert).Select(x => x.info), new BulkInsertSettings
Expand All @@ -234,28 +227,17 @@ private async Task InsertContactsFromCsvAsync(Stream csvStream, ImportContext co

if (group != null)
{
// if (previousInsertGroupMemberBatch != null)
// {
// await previousInsertGroupMemberBatch;
// }

// previousInsertGroupMemberBatch =
// cannot employ async insert, it is not stable (bricks bulk contact sql connection)
await InsertGroupMembersAsync(contactBatch.Select(x => x.info.ContactGUID), group);
}
}

// if (previousInsertGroupMemberBatch != null)
// {
// await previousInsertGroupMemberBatch;
// }
}
}

private Task InsertGroupMembersAsync(IEnumerable<Guid> contactGuids, ContactGroupInfo group) =>
Task.Run(() =>
{
var query = @"
string query = @"
INSERT INTO [dbo].[OM_ContactGroupMember] ([ContactGroupMemberContactGroupID], [ContactGroupMemberType], [ContactGroupMemberRelatedID],
[ContactGroupMemberFromCondition], [ContactGroupMemberFromAccount], [ContactGroupMemberFromManual])
-- OUTPUT [inserted].[ContactGroupMemberContactGroupID]
Expand All @@ -270,7 +252,7 @@ FROM [dbo].[OM_ContactGroupMember] [CGM]
AND [CGM].[ContactGroupMemberRelatedID] = [C].[ContactID])
";
var jsonGuidArray = JsonConvert.SerializeObject(contactGuids);
string jsonGuidArray = JsonConvert.SerializeObject(contactGuids);
ConnectionHelper.ExecuteNonQuery(query, new QueryDataParameters
{
Expand All @@ -283,6 +265,7 @@ FROM [dbo].[OM_ContactGroupMember] [CGM]
private Task DeletedContactsAsync(List<Guid> contactGuids, int batchLimit)
{
// for future implementation of bulk delete
#pragma warning disable S125
// var query = @"WITH [CTE]([Guid])
// AS
// (SELECT CAST([l].[value] AS UNIQUEIDENTIFIER) [Guid]
Expand All @@ -291,13 +274,16 @@ private Task DeletedContactsAsync(List<Guid> contactGuids, int batchLimit)
// FROM [dbo].[OM_Contact]
// OUTPUT [deleted].[ContactID]
// WHERE EXISTS (SELECT 1 FROM [CTE] WHERE [CTE].[Guid] = [ContactGUID])";
#pragma warning restore S125
if (contactGuids.Count == 0)
{
return Task.CompletedTask;
}

return Task.Run(() =>
{
var jsonGuidArray = JsonConvert.SerializeObject(contactGuids);
var whereCondition = $"""
string jsonGuidArray = JsonConvert.SerializeObject(contactGuids);
string whereCondition = $"""
EXISTS (SELECT 1 FROM OPENJSON('{jsonGuidArray}', '$') [l] WHERE CAST([l].[value] AS UNIQUEIDENTIFIER) = [ContactGUID])
""";
Expand Down

0 comments on commit 0a63e65

Please sign in to comment.