forked from akkadotnet/Akka.Hosting
-
Notifications
You must be signed in to change notification settings - Fork 0
/
AkkaClusterHostingExtensions.cs
1954 lines (1787 loc) · 92.5 KB
/
AkkaClusterHostingExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Hosting.SBR;
using Akka.Cluster.Sharding;
using Akka.Cluster.Tools.Client;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Coordination;
using Akka.DependencyInjection;
using Akka.Discovery;
using Akka.Hosting;
using Akka.Hosting.Coordination;
using Akka.Persistence.Hosting;
using Akka.Remote.Hosting;
#nullable enable
namespace Akka.Cluster.Hosting
{
/// <summary>
/// The set of options for enabling Akka.Cluster support.
/// </summary>
public sealed class ClusterOptions
{
/// <summary>
/// The akka.cluster.roles values.
/// </summary>
public string[]? Roles { get; set; }
/// <summary>
/// Optional cluster role check to consider if a specific cluster role have enough
/// members to be considered to be up. The default value is 1 node per role.
/// </summary>
public Dictionary<string, int>? MinimumNumberOfMembersPerRole { get; set; }
/// <summary>
/// If populated, the akka.cluster.seed-nodes that will be used.
/// </summary>
public string[]? SeedNodes { get; set; }
/// <summary>
/// <para>
/// Minimum required number of members before the leader changes member status
/// of 'Joining' members to 'Up'. Typically used together with
/// <see cref="Cluster.RegisterOnMemberUp"/> to defer some action, such as starting actors,
/// until the cluster has reached a certain size.
/// </para>
/// <b>Default:</b> 1
/// </summary>
public int? MinimumNumberOfMembers { get; set; }
/// <summary>
/// <para>
/// Application version of the deployment. Used by rolling update features
/// to distinguish between old and new nodes. The typical convention is to use
/// 3 digit version numbers `major.minor.patch`, but 1 or two digits are also
/// supported.
/// </para>
/// <para>
/// If no `.` is used it is interpreted as a single digit version number or as
/// plain alphanumeric if it couldn't be parsed as a number.
/// </para>
/// <para>
/// It may also have a qualifier at the end for 2 or 3 digit version numbers such
/// as "1.2-RC1".<br/>
/// For 1 digit with qualifier, 1-RC1, it is interpreted as plain alphanumeric.
/// </para>
/// <para>
/// It has support for https://github.com/dwijnand/sbt-dynver format with `+` or
/// `-` separator. The number of commits from the tag is handled as a numeric part.
/// For example `1.0.0+3-73475dce26` is less than `1.0.10+10-ed316bd024` (3 < 10).
/// </para>
/// <para>
/// Values can be "assembly-version" or a version string as defined above, i.e.<br/>
/// app-version = "1.0.0"<br/>
/// app-version = "1.1-beta1"<br/>
/// app-version = "1"<br/>
/// app-version = "1.1"<br/>
/// </para>
/// <b>Default:</b> by default the app-version will default to the entry assembly's version,
/// i.e. the assembly of the executable running `Program.cs`
/// </summary>
public string? AppVersion { get; set; }
/// <summary>
/// <para>
/// Enable/disable info level logging of cluster events
/// </para>
/// <b>Default:</b> <c>true</c>
/// </summary>
public bool? LogInfo { get; set; }
/// <summary>
/// <para>
/// Enable/disable verbose info-level logging of cluster events for temporary troubleshooting.
/// </para>
/// <b>Default:</b> <c>false</c>
/// </summary>
public bool? LogInfoVerbose { get; set; }
/// <summary>
/// Split brain resolver configuration parameters. This can be an instance of one of these classes:
/// <list type="bullet">
/// <item><see cref="StaticQuorumOption"/></item>
/// <item><see cref="KeepMajorityOption"/></item>
/// <item><see cref="KeepOldestOption"/></item>
/// <item><see cref="LeaseMajorityOption"/></item>
/// </list>
/// To use the default split brain resolver options, use <see cref="SplitBrainResolverOption.Default"/> which
/// uses the keep majority resolving strategy.
/// </summary>
public SplitBrainResolverOption? SplitBrainResolver { get; set; }
/// <summary>
/// <para>
/// Settings for the failure detector used by the cluster subsystem to detect unreachable members.
/// </para>
/// </summary>
public PhiAccrualFailureDetectorOptions? FailureDetector { get; set; }
}
public sealed class ClusterSingletonOptions
{
/// <summary>
/// <para>
/// The number of messages <see cref="ClusterSingletonProxy"/> will buffer when the cluster singleton
/// location is unknown. Older messages will be dropped on buffer overflow. Setting this property to 0
/// will disable the buffer.
/// </para>
/// <b>Valid values:</b> 0 - 10000<br/>
/// <b>Default:</b> 1000
/// </summary>
public int? BufferSize { get; set; } = null;
/// <summary>
/// If set, the singleton will only be instantiated on nodes set with the role name.
/// </summary>
public string? Role { get; set; }
/// <summary>
/// When handing over to a new oldest node this <see cref="TerminationMessage"/> is sent to the singleton actor
/// to tell it to finish its work, close resources, and stop. The hand-over to the new oldest node
/// is completed when the singleton actor is terminated. Note that <see cref="PoisonPill"/> is a
/// perfectly fine <see cref="TerminationMessage"/> if you only need to stop the actor.
/// </summary>
public object? TerminationMessage { get; set; }
/// <summary>
/// An class instance that extends <see cref="LeaseOptionBase"/>, used to configure the lease provider used in this
/// cluster singleton.
/// </summary>
public LeaseOptionBase? LeaseImplementation { get; set; }
/// <summary>
/// The interval between retries for acquiring the lease
/// </summary>
public TimeSpan? LeaseRetryInterval { get; set; }
/// <summary>
/// Interval at which the proxy will try to resolve the singleton instance.
/// </summary>
public TimeSpan? SingletonIdentificationInterval { get; set; }
/// <summary>
/// Should the singleton proxy publish a warning if no singleton actor were found after a period of time
/// </summary>
public bool? LogSingletonIdentificationFailure { get; set; }
/// <summary>
/// The period the proxy will wait until it logs a missing singleton warning, defaults to 1 minute
/// </summary>
public TimeSpan? SingletonIdentificationFailurePeriod { get; set; }
internal ClusterSingletonManagerSettings ToManagerSettings(string singletonName, ActorSystem system)
{
var settings = ClusterSingletonManagerSettings.Create(system);
var retry = LeaseRetryInterval ?? system.Settings.Config.GetTimeSpan("akka.cluster.singleton.lease-retry-interval");
var leaseSettings = LeaseImplementation is not null
? new LeaseUsageSettings(LeaseImplementation.ConfigPath, retry)
: null;
return new ClusterSingletonManagerSettings(
singletonName: singletonName,
role: Role ?? settings.Role,
removalMargin: settings.RemovalMargin,
handOverRetryInterval: settings.HandOverRetryInterval,
leaseSettings: leaseSettings ?? settings.LeaseSettings,
considerAppVersion: false);
}
internal ClusterSingletonProxySettings ToProxySettings(string singletonName, ActorSystem system)
{
var settings = ClusterSingletonProxySettings.Create(system);
return new ClusterSingletonProxySettings(
singletonName: singletonName,
role: Role ?? settings.Role,
singletonIdentificationInterval: SingletonIdentificationInterval ?? settings.SingletonIdentificationInterval,
bufferSize: BufferSize ?? settings.BufferSize,
considerAppVersion: settings.ConsiderAppVersion,
logSingletonIdentificationFailure: LogSingletonIdentificationFailure ?? settings.LogSingletonIdentificationFailure,
singletonIdentificationFailurePeriod: SingletonIdentificationFailurePeriod ?? settings.SingletonIdentificationFailurePeriod);
}
}
public sealed class ShardOptions
{
/// <summary>
/// <para>
/// Defines how the coordinator stores its state. The same setting is also used by the
/// shards when <see cref="RememberEntities"/> is set to <c>true</c>.
/// </para>
///
/// Possible values are <see cref="Akka.Cluster.Sharding.StateStoreMode.Persistence"/> and
/// <see cref="Akka.Cluster.Sharding.StateStoreMode.DData"/>
/// </summary>
public StateStoreMode? StateStoreMode { get; set; }
/// <summary>
/// <para>
/// When <see cref="RememberEntities"/> is enabled and the state store mode is
/// <see cref="Akka.Cluster.Sharding.StateStoreMode.DData"/>, this controls how the remembered entities
/// and shards are stored.
/// </para>
///
/// <para>
/// Possible values are <see cref="Akka.Cluster.Sharding.RememberEntitiesStore.Eventsourced"/> and
/// <see cref="Akka.Cluster.Sharding.RememberEntitiesStore.DData"/>
/// </para>
/// </summary>
public RememberEntitiesStore? RememberEntitiesStore { get; set; }
/// <summary>
/// When set to <c>true</c>, the active entity actors will automatically be restarted
/// upon Shard restart. i.e. if the Shard is started on a different ShardRegion
/// due to re-balance or crash.
/// </summary>
public bool? RememberEntities { get; set; }
/// <summary>
/// Specifies that entities should be instantiated on cluster nodes with a specific role.
/// If not specified, all nodes in the cluster are used.
/// </summary>
public string? Role { get; set; }
/// <summary>
/// <para>
/// The journal plugin configuration identifier used by persistence mode, eg. "sql-server" or
/// "postgresql".<br/>
/// You only need to declare <see cref="JournalPluginId"/> or <see cref="JournalOptions"/>,
/// <see cref="JournalOptions"/> Identifier will be used if both are declared.
/// </para>
/// <b>NOTE</b> This setting is only used when <see cref="StateStoreMode"/> is set to
/// <see cref="Akka.Cluster.Sharding.StateStoreMode.Persistence"/>
/// </summary>
public string? JournalPluginId { get; set; }
/// <summary>
/// <para>
/// The journal plugin options used by persistence mode, eg. <c>SqlServerJournalOptions</c>
/// or <c>PostgreSqlJournalOptions</c>.<br/>
/// You only need to declare <see cref="JournalPluginId"/> or <see cref="JournalOptions"/>,
/// <see cref="JournalOptions"/> Identifier will be used if both are declared.
/// </para>
/// <b>NOTE</b> This setting is only used when <see cref="StateStoreMode"/> is set to
/// <see cref="Akka.Cluster.Sharding.StateStoreMode.Persistence"/>
/// </summary>
public JournalOptions? JournalOptions { get; set; }
/// <summary>
/// <para>
/// The snapshot store plugin configuration identifier used by persistence mode, eg. "sql-server" or
/// "postgresql".<br/>
/// You only need to declare <see cref="SnapshotPluginId"/> or <see cref="SnapshotOptions"/>,
/// <see cref="SnapshotOptions"/> Identifier will be used if both are declared.
/// </para>
/// <b>NOTE</b> This setting is only used when <see cref="StateStoreMode"/> is set to
/// <see cref="Akka.Cluster.Sharding.StateStoreMode.Persistence"/>
/// </summary>
public string? SnapshotPluginId { get; set; }
/// <summary>
/// <para>
/// The snapshot store plugin options used by persistence mode, eg. <c>SqlServerSnapshotOptions</c>
/// or <c>PostgreSqlSnapshotOptions</c>.<br/>
/// You only need to declare <see cref="SnapshotPluginId"/> or <see cref="SnapshotOptions"/>,
/// <see cref="SnapshotOptions"/> Identifier will be used if both are declared.
/// </para>
/// <b>NOTE</b> This setting is only used when <see cref="StateStoreMode"/> is set to
/// <see cref="Akka.Cluster.Sharding.StateStoreMode.Persistence"/>
/// </summary>
public SnapshotOptions? SnapshotOptions { get; set; }
/// <summary>
/// An class instance that extends <see cref="LeaseOptionBase"/>, used to configure the lease provider used in this
/// sharding region.
/// </summary>
public LeaseOptionBase? LeaseImplementation { get; set; }
/// <summary>
/// The interval between retries for acquiring the lease
/// </summary>
public TimeSpan? LeaseRetryInterval { get; set; }
/// <summary>
/// The message that will be sent to entities when they are to be stopped for a rebalance or
/// graceful shutdown of a <see cref="Sharding.ShardRegion"/>, e.g. <see cref="PoisonPill"/>.
/// </summary>
public object? HandOffStopMessage { get; set; }
/// <summary>
/// Throw an exception if the internal state machine in the Shard actor does an invalid state transition.
/// Mostly for the Akka test suite, if off the invalid transition is logged as a warning instead of throwing and
/// crashing the shard.
/// </summary>
public bool? FailOnInvalidEntityStateTransition { get; set; }
/// <summary>
/// <para>
/// Settings for the Distributed Data replicator.
/// The <see cref="ShardingDDataOptions.Role"/> property is not used. The distributed-data
/// role will be the same as <see cref="ShardOptions.Role"/>.
/// Note that there is one Replicator per role and it's not possible
/// to have different distributed-data settings for different sharding entity types.
/// </para>
/// <b>NOTE</b> This setting is only used when <see cref="StateStoreMode"/> is set to
/// <see cref="Akka.Cluster.Sharding.StateStoreMode.DData"/>
/// </summary>
[Obsolete("This property is not being applied to the ActorSystem anymore. " +
"Use `WithShardingDistributedData()` extension method or set them using manual HOCON " +
"configuration to set \"akka.cluster.sharding.distributed-data\" values. " +
"Since v1.5.27")]
public ShardingDDataOptions DistributedData { get; } = new();
/// <summary>
/// Set this to false to disable idle entity passivation. When set to <c>false</c>,
/// will always override <see cref="PassivateIdleEntityAfter"/>
/// </summary>
public bool? ShouldPassivateIdleEntities { get; set; }
/// <summary>
/// Set this to a time duration to have sharding passivate entities when they have not
/// received any message in this length of time.
/// It is always disabled if <see cref="RememberEntities"/> is enabled
/// or <see cref="ShouldPassivateIdleEntities"/> is set to false.
/// </summary>
public TimeSpan? PassivateIdleEntityAfter { get; set; }
public TimeSpan? ShardRegionQueryTimeout { get; set; }
public override string ToString()
{
var sb = new StringBuilder();
if (Role is not null)
sb.AppendLine($"role = {Role.ToHocon()}");
if(RememberEntities is not null)
sb.AppendLine($"remember-entities = {RememberEntities.ToHocon()}");
if(RememberEntitiesStore is not null)
sb.AppendLine($"remember-entities-store = {RememberEntitiesStore.ToString().ToLowerInvariant().ToHocon()}");
var journalId = JournalOptions?.PluginId ?? JournalPluginId ?? null;
if (journalId is not null)
sb.AppendLine($"journal-plugin-id = {journalId.ToHocon()}");
var snapshotId = SnapshotOptions?.PluginId ?? SnapshotPluginId ?? null;
if (snapshotId is not null)
sb.AppendLine($"snapshot-plugin-id = {snapshotId.ToHocon()}");
if (StateStoreMode is not null)
sb.AppendLine($"state-store-mode = {StateStoreMode.ToString().ToLowerInvariant().ToHocon()}");
if (LeaseImplementation is not null)
sb.AppendLine($"use-lease = {LeaseImplementation.ConfigPath}");
if (LeaseRetryInterval is not null)
sb.AppendLine($"lease-retry-interval = {LeaseRetryInterval.ToHocon()}");
if (FailOnInvalidEntityStateTransition is not null)
sb.AppendLine(
$"fail-on-invalid-entity-state-transition = {FailOnInvalidEntityStateTransition.ToHocon()}");
if(ShouldPassivateIdleEntities is false)
sb.AppendLine("passivate-idle-entity-after = off");
else if(PassivateIdleEntityAfter is not null)
sb.AppendLine($"passivate-idle-entity-after = {PassivateIdleEntityAfter.ToHocon()}");
if (ShardRegionQueryTimeout is not null)
sb.AppendLine($"shard-region-query-timeout = {ShardRegionQueryTimeout.ToHocon()}");
return sb.ToString();
}
}
public sealed class ShardingDDataOptions : DDataOptions
{
public int? MajorityMinimumCapacity { get; set; }
public int? MaxDeltaElements { get; set; }
internal void Apply(AkkaConfigurationBuilder builder)
{
base.Apply(builder, "akka.cluster.sharding");
var sb = new StringBuilder();
if (MajorityMinimumCapacity is not null)
sb.AppendLine($"majority-min-cap = {MajorityMinimumCapacity}");
if (MaxDeltaElements is not null)
sb.AppendLine($"max-delta-elements = {MaxDeltaElements}");
if(sb.Length == 0)
return;
sb.Insert(0, "akka.cluster.sharding.distributed-data {");
sb.AppendLine("}");
builder.AddHocon(sb.ToString(), HoconAddMode.Prepend);
}
}
public class DDataOptions
{
/// <summary>
/// <para>
/// Actor name of the Replicator actor.
/// </para>
/// <b>Default</b>: "ddataReplicator"
/// </summary>
public string? Name { get; set; }
/// <summary>
/// Replicas are running on members tagged with this role.
/// All members are used if null or empty.
/// </summary>
public string? Role { get; set; }
/// <summary>
/// When set to <c>true</c>, this flag will attach a backoff supervisor to the replicator;
/// any failing replicator to be restarted
/// </summary>
public bool? RecreateOnFailure { get; set; }
/// <summary>
/// When set to <c>true</c>, Update and Get operations are sent to oldest nodes first.
/// This is useful together with Cluster Singleton, which is running on oldest nodes.
/// </summary>
public bool? PreferOldest { get; set; }
/// <summary>
/// When set to <c>true</c>, provide a higher level of details in the debug logs, including gossip status.
/// Be careful about enabling in production systems.
/// </summary>
public bool? VerboseDebugLogging { get; set; }
public DurableOptions Durable { get; set; } = new();
internal virtual void Apply(AkkaConfigurationBuilder builder, string prefix = "akka.cluster")
{
var sb = new StringBuilder();
if (Name is not null)
sb.AppendLine($"name = {Name.ToHocon()}");
if (Role is not null)
sb.AppendLine($"role = {Role.ToHocon()}");
if (RecreateOnFailure is not null)
sb.AppendLine($"recreate-on-failure = {RecreateOnFailure.ToHocon()}");
if (PreferOldest is not null)
sb.AppendLine($"prefer-oldest = {PreferOldest.ToHocon()}");
if (VerboseDebugLogging is not null)
sb.AppendLine($"verbose-debug-logging = {VerboseDebugLogging.ToHocon()}");
var durableSb = new StringBuilder();
if (Durable.Keys is not null)
durableSb.AppendLine($"keys = [{string.Join(",", Durable.Keys.Select(s => s.ToHocon()))}]");
var lmdbSb = new StringBuilder();
var lmdb = Durable.Lmdb;
if (lmdb.Directory is not null)
lmdbSb.AppendLine($"dir = {lmdb.Directory.ToHocon()}");
if (lmdb.MapSize is not null)
lmdbSb.AppendLine($"map-size = {lmdb.MapSize}");
if (lmdb.WriteBehindInterval is not null)
lmdbSb.AppendLine($"write-behind-interval = {lmdb.WriteBehindInterval.ToHocon()}");
if (lmdbSb.Length > 0)
{
durableSb
.AppendLine("lmdb {")
.AppendLine(lmdbSb.ToString())
.AppendLine("}");
}
if (durableSb.Length > 0)
{
sb.AppendLine("durable {")
.AppendLine(durableSb.ToString())
.AppendLine("}");
}
if(sb.Length == 0)
return;
sb.Insert(0, $"{prefix}.distributed-data {{");
sb.AppendLine("}");
builder.AddHocon(sb.ToString(), HoconAddMode.Prepend);
}
}
public class DurableOptions
{
/// <summary>
/// List of keys that are durable. Prefix matching is supported by using * at the
/// end of a key.
/// </summary>
public string[]? Keys { get; set; }
public LmdbOptions Lmdb { get; set; } = new();
}
public class LmdbOptions
{
/// <summary>
/// Directory of LMDB file. There are two options:
/// <list type="number">
/// <item>
/// A relative or absolute path to a directory that ends with 'ddata'
/// the full name of the directory will contain name of the ActorSystem
/// and its remote port.
/// </item>
/// <item>
/// Otherwise the path is used as is, as a relative or absolute path to
/// a directory.
/// </item>
/// </list>
/// When running in production you may want to configure this to a specific
/// path (alt 2), since the default directory contains the remote port of the
/// actor system to make the name unique. If using a dynamically assigned
/// port (0) it will be different each time and the previously stored data
/// will not be loaded.
/// </summary>
public string? Directory { get; set; }
/// <summary>
/// Size in bytes of the memory mapped file.
/// </summary>
public long? MapSize { get; set; }
/// <summary>
/// Accumulate changes before storing improves performance with the
/// risk of losing the last writes if the process crashes.
/// The interval is by default set to 0 to write each update immediately.
/// Enabling write behind by specifying a duration, e.g. 200ms, is especially
/// efficient when performing many writes to the same key, because it is only
/// the last value for each key that will be serialized and stored.
/// </summary>
public TimeSpan? WriteBehindInterval { get; set; }
}
public static class AkkaClusterHostingExtensions
{
internal static AkkaConfigurationBuilder BuildClusterHocon(
this AkkaConfigurationBuilder builder,
ClusterOptions? options)
{
if (options == null)
return builder.AddHocon(ClusterSharding.DefaultConfig()
.WithFallback(ClusterSingletonManager.DefaultConfig())
.WithFallback(DistributedPubSub.DefaultConfig())
.WithFallback(ClusterClientReceptionist.DefaultConfig())
.WithFallback(DistributedData.DistributedData.DefaultConfig()), HoconAddMode.Append);
var sb = new StringBuilder()
.AppendLine("akka.cluster {");
if (options.Roles is { Length: > 0 })
{
sb.AppendLine($"roles = [{string.Join(",", options.Roles)}]");
}
if (options.MinimumNumberOfMembersPerRole is { Count: > 0 })
{
sb.AppendLine("role {");
foreach (var kvp in options.MinimumNumberOfMembersPerRole)
{
sb.AppendLine($"{kvp.Key}.min-nr-of-members = {kvp.Value}");
}
sb.AppendLine("}");
}
if (options.SeedNodes is { Length: > 0 })
{
// Validate that all addresses are valid.
sb.Append("seed-nodes = [");
foreach (var addrString in options.SeedNodes)
{
Address.Parse(addrString);
sb.Append($"{addrString.ToHocon()}, ");
}
sb.AppendLine("]");
}
if (options.MinimumNumberOfMembers is not null)
sb.AppendLine($"min-nr-of-members = {options.MinimumNumberOfMembers}");
if (options.AppVersion is not null)
sb.AppendLine($"app-version = {options.AppVersion.ToHocon()}");
if (options.LogInfo is not null)
sb.AppendLine($"log-info = {options.LogInfo.ToHocon()}");
if (options.LogInfoVerbose is not null)
sb.AppendLine($"log-info-verbose = {options.LogInfoVerbose.ToHocon()}");
if (options.FailureDetector is not null)
{
var fsb = options.FailureDetector.ToHocon();
if (fsb.Length > 0)
{
sb.AppendLine("failure-detector {\n");
sb.Append(fsb);
sb.AppendLine("}");
}
}
sb.AppendLine("}");
// prepend the composed configuration
builder.AddHocon(sb.ToString(), HoconAddMode.Prepend);
options.SplitBrainResolver?.Apply(builder);
// populate all of the possible Clustering default HOCON configurations here
return builder.AddHocon(ClusterSharding.DefaultConfig()
.WithFallback(ClusterSingletonManager.DefaultConfig())
.WithFallback(DistributedPubSub.DefaultConfig())
.WithFallback(ClusterClientReceptionist.DefaultConfig())
.WithFallback(DistributedData.DistributedData.DefaultConfig()), HoconAddMode.Append);
}
/// <summary>
/// Adds Akka.Cluster support to the <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder">
/// The builder instance being configured.
/// </param>
/// <param name="options">
/// Optional. Akka.Cluster configuration parameters.
/// </param>
/// <returns>
/// The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.
/// </returns>
public static AkkaConfigurationBuilder WithClustering(
this AkkaConfigurationBuilder builder,
ClusterOptions? options = null)
{
var hoconBuilder = BuildClusterHocon(builder, options);
if (builder.ActorRefProvider.HasValue)
{
switch (builder.ActorRefProvider.Value)
{
case ProviderSelection.Cluster:
case ProviderSelection.Custom:
return hoconBuilder; // no-op
}
}
return hoconBuilder.WithActorRefProvider(ProviderSelection.Cluster.Instance);
}
public static AkkaConfigurationBuilder WithDistributedData(
this AkkaConfigurationBuilder builder,
Action<DDataOptions> configurator)
{
var options = new DDataOptions();
configurator(options);
return builder.WithDistributedData(options);
}
public static AkkaConfigurationBuilder WithDistributedData(
this AkkaConfigurationBuilder builder,
DDataOptions options)
{
options.Apply(builder);
builder.AddHocon(DistributedData.DistributedData.DefaultConfig(), HoconAddMode.Append);
return builder;
}
/// <summary>
/// Configure the global sharding distributed data settings. This settings will only be used when ShardOptions.StateStoreMode.
/// </summary>
/// <param name="builder">
/// The builder instance being configured.
/// </param>
/// <param name="configure">
/// Configuration method for configuring the <see cref="ShardingDDataOptions"/>
/// </param>
/// <returns>
/// The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.
/// </returns>
public static AkkaConfigurationBuilder WithShardingDistributedData(
this AkkaConfigurationBuilder builder,
Action<ShardingDDataOptions> configure)
{
var options = new ShardingDDataOptions();
configure(options);
return builder.WithShardingDistributedData(options);
}
/// <summary>
/// Configure the global sharding distributed data settings
/// </summary>
/// <param name="builder">
/// The builder instance being configured.
/// </param>
/// <param name="options">
/// The <see cref="ShardingDDataOptions"/> that will be used to configure cluster sharding
/// global distributed data settings
/// </param>
/// <returns>
/// The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.
/// </returns>
public static AkkaConfigurationBuilder WithShardingDistributedData(
this AkkaConfigurationBuilder builder,
ShardingDDataOptions options)
{
options.Apply(builder);
var dDataSettings = DistributedData.DistributedData.DefaultConfig()
.MoveTo("akka.cluster.sharding.distributed-data");
builder.AddHocon(dDataSettings, HoconAddMode.Append);
return builder;
}
/// <summary>
/// Starts a <see cref="ShardRegion"/> actor for the given entity <see cref="typeName"/>
/// and registers the ShardRegion <see cref="IActorRef"/> with <see cref="TKey"/> in the
/// <see cref="ActorRegistry"/> for this <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder">
/// The builder instance being configured.
/// </param>
/// <param name="typeName">
/// The name of the entity type
/// </param>
/// <param name="entityPropsFactory">
/// Function that, given an entity id, returns the <see cref="Actor.Props"/> of the entity actors that will be created by the <see cref="Sharding.ShardRegion"/>
/// </param>
/// <param name="messageExtractor">
/// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message.
/// </param>
/// <param name="shardOptions">
/// The set of options for configuring <see cref="ClusterShardingSettings"/>
/// </param>
/// <typeparam name="TKey">
/// The type key to use to retrieve the <see cref="IActorRef"/> for this <see cref="ShardRegion"/>.
/// </typeparam>
/// <returns>
/// The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.
/// </returns>
public static AkkaConfigurationBuilder WithShardRegion<TKey>(
this AkkaConfigurationBuilder builder,
string typeName,
Func<string, Props> entityPropsFactory,
IMessageExtractor messageExtractor,
ShardOptions shardOptions)
{
return builder.WithShardRegion<TKey>(typeName, (_, _, _) => entityPropsFactory,
messageExtractor, shardOptions);
}
/// <summary>
/// Starts a <see cref="ShardRegion"/> actor for the given entity <see cref="typeName"/>
/// and registers the ShardRegion <see cref="IActorRef"/> with <see cref="TKey"/> in the
/// <see cref="ActorRegistry"/> for this <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder">
/// The builder instance being configured.
/// </param>
/// <param name="typeName">
/// The name of the entity type
/// </param>
/// <param name="entityPropsFactory">
/// Function that, given an entity id, returns the <see cref="Actor.Props"/> of the entity actors that will be created by the <see cref="Sharding.ShardRegion"/>
/// </param>
/// <param name="extractEntityId">
/// Partial function to extract the entity id and the message to send to the entity from the incoming message,
/// if the partial function does not match the message will be `unhandled`,
/// i.e.posted as `Unhandled` messages on the event stream
/// </param>
/// <param name="extractShardId">
/// Function to determine the shard id for an incoming message, only messages that passed the `extractEntityId` will be used
/// </param>
/// <param name="shardOptions">
/// The set of options for configuring <see cref="ClusterShardingSettings"/>
/// </param>
/// <typeparam name="TKey">
/// The type key to use to retrieve the <see cref="IActorRef"/> for this <see cref="ShardRegion"/>.
/// </typeparam>
/// <returns>
/// The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.
/// </returns>
[Obsolete("Use IMessageExtractor instead of the ExtractEntityId and ExtractShardId delegates.")]
public static AkkaConfigurationBuilder WithShardRegion<TKey>(
this AkkaConfigurationBuilder builder,
string typeName,
Func<string, Props> entityPropsFactory,
ExtractEntityId extractEntityId,
ExtractShardId extractShardId,
ShardOptions shardOptions)
{
return builder.WithShardRegion<TKey>(typeName, (_, _, _) => entityPropsFactory,
extractEntityId, extractShardId, shardOptions);
}
/// <summary>
/// Starts a <see cref="ShardRegion"/> actor for the given entity <see cref="typeName"/>
/// and registers the ShardRegion <see cref="IActorRef"/> with <see cref="TKey"/> in the
/// <see cref="ActorRegistry"/> for this <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder">
/// The builder instance being configured.
/// </param>
/// <param name="typeName">
/// The name of the entity type
/// </param>
/// <param name="entityPropsFactory">
/// Function that, given an entity id, returns the <see cref="Actor.Props"/> of the entity actors that will be created by the <see cref="Sharding.ShardRegion"/>.
///
/// This function also accepts the <see cref="ActorSystem"/> and the <see cref="IActorRegistry"/> as inputs.
/// </param>
/// <param name="messageExtractor">
/// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message.
/// </param>
/// <param name="shardOptions">
/// The set of options for configuring <see cref="ClusterShardingSettings"/>
/// </param>
/// <typeparam name="TKey">
/// The type key to use to retrieve the <see cref="IActorRef"/> for this <see cref="ShardRegion"/>.
/// </typeparam>
/// <returns>
/// The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.
/// </returns>
public static AkkaConfigurationBuilder WithShardRegion<TKey>(
this AkkaConfigurationBuilder builder,
string typeName,
Func<ActorSystem, IActorRegistry, Func<string, Props>> entityPropsFactory,
IMessageExtractor messageExtractor,
ShardOptions shardOptions)
{
return builder.WithShardRegion<TKey>(typeName,
(system, registry, _) => entityPropsFactory(system, registry),
messageExtractor, shardOptions);
}
/// <summary>
/// Starts a <see cref="ShardRegion"/> actor for the given entity <see cref="typeName"/>
/// and registers the ShardRegion <see cref="IActorRef"/> with <see cref="TKey"/> in the
/// <see cref="ActorRegistry"/> for this <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder">
/// The builder instance being configured.
/// </param>
/// <param name="typeName">
/// The name of the entity type
/// </param>
/// <param name="entityPropsFactory">
/// Function that, given an entity id, returns the <see cref="Actor.Props"/> of the entity actors that will be created by the <see cref="Sharding.ShardRegion"/>.
///
/// This function also accepts the <see cref="ActorSystem"/> and the <see cref="IActorRegistry"/> as inputs.
/// </param>
/// <param name="extractEntityId">
/// Partial function to extract the entity id and the message to send to the entity from the incoming message,
/// if the partial function does not match the message will be `unhandled`,
/// i.e.posted as `Unhandled` messages on the event stream
/// </param>
/// <param name="extractShardId">
/// Function to determine the shard id for an incoming message, only messages that passed the `extractEntityId` will be used
/// </param>
/// <param name="shardOptions">
/// The set of options for configuring <see cref="ClusterShardingSettings"/>
/// </param>
/// <typeparam name="TKey">
/// The type key to use to retrieve the <see cref="IActorRef"/> for this <see cref="ShardRegion"/>.
/// </typeparam>
/// <returns>
/// The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.
/// </returns>
[Obsolete("Use IMessageExtractor instead of the ExtractEntityId and ExtractShardId delegates.")]
public static AkkaConfigurationBuilder WithShardRegion<TKey>(
this AkkaConfigurationBuilder builder,
string typeName,
Func<ActorSystem, IActorRegistry, Func<string, Props>> entityPropsFactory,
ExtractEntityId extractEntityId,
ExtractShardId extractShardId,
ShardOptions shardOptions)
{
return builder.WithShardRegion<TKey>(typeName,
(system, registry, _) => entityPropsFactory(system, registry),
extractEntityId, extractShardId, shardOptions);
}
/// <summary>
/// Starts a <see cref="ShardRegion"/> actor for the given entity <see cref="typeName"/>
/// and registers the ShardRegion <see cref="IActorRef"/> with <see cref="TKey"/> in the
/// <see cref="ActorRegistry"/> for this <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder">
/// The builder instance being configured.
/// </param>
/// <param name="typeName">
/// The name of the entity type
/// </param>
/// <param name="entityPropsFactory">
/// Function that, given an entity id, returns the <see cref="Actor.Props"/> of the entity actors that will be created by the <see cref="Sharding.ShardRegion"/>.
///
/// This function also accepts the <see cref="ActorSystem"/> and the <see cref="IActorRegistry"/> as inputs.
/// </param>
/// <param name="messageExtractor">
/// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message.
/// </param>
/// <param name="shardOptions">
/// The set of options for configuring <see cref="ClusterShardingSettings"/>
/// </param>
/// <typeparam name="TKey">
/// The type key to use to retrieve the <see cref="IActorRef"/> for this <see cref="ShardRegion"/>.
/// </typeparam>
/// <returns>
/// The same <see cref="AkkaConfigurationBuilder"/> instance originally passed in.
/// </returns>
public static AkkaConfigurationBuilder WithShardRegion<TKey>(
this AkkaConfigurationBuilder builder,
string typeName,
Func<ActorSystem, IActorRegistry, IDependencyResolver, Func<string, Props>> entityPropsFactory,
IMessageExtractor messageExtractor,
ShardOptions shardOptions)
{
builder.AddHocon(
ClusterSharding.DefaultConfig()
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig()),
HoconAddMode.Append);
return builder.StartActors(Resolver);
async Task Resolver(ActorSystem system, IActorRegistry registry, IDependencyResolver resolver)
{
var props = entityPropsFactory(system, registry, resolver);
var shardingConfig = ConfigurationFactory.ParseString(shardOptions.ToString())
.WithFallback(system.Settings.Config.GetConfig("akka.cluster.sharding"));
var coordinatorConfig = system.Settings.Config.GetConfig(
shardingConfig.GetString("coordinator-singleton"));
var settings = ClusterShardingSettings.Create(shardingConfig, coordinatorConfig);
var allocationStrategy = ClusterSharding.Get(system).DefaultShardAllocationStrategy(settings);
var shardRegion = await ClusterSharding.Get(system).StartAsync(
typeName, props, settings, messageExtractor, allocationStrategy,
shardOptions.HandOffStopMessage ?? PoisonPill.Instance).ConfigureAwait(false);
registry.Register<TKey>(shardRegion);
}
}
/// <summary>
/// Starts a <see cref="ShardRegion"/> actor for the given entity <see cref="typeName"/>
/// and registers the ShardRegion <see cref="IActorRef"/> with <see cref="TKey"/> in the
/// <see cref="ActorRegistry"/> for this <see cref="ActorSystem"/>.
/// </summary>
/// <param name="builder">
/// The builder instance being configured.
/// </param>
/// <param name="typeName">
/// The name of the entity type
/// </param>
/// <param name="entityPropsFactory">
/// Function that, given an entity id, returns the <see cref="Actor.Props"/> of the entity actors that will be created by the <see cref="Sharding.ShardRegion"/>.
///
/// This function also accepts the <see cref="ActorSystem"/> and the <see cref="IActorRegistry"/> as inputs.
/// </param>
/// <param name="extractEntityId">
/// Partial function to extract the entity id and the message to send to the entity from the incoming message,
/// if the partial function does not match the message will be `unhandled`,
/// i.e.posted as `Unhandled` messages on the event stream
/// </param>
/// <param name="extractShardId">
/// Function to determine the shard id for an incoming message, only messages that passed the `extractEntityId` will be used
/// </param>
/// <param name="shardOptions">
/// The set of options for configuring <see cref="ClusterShardingSettings"/>
/// </param>
/// <typeparam name="TKey">
/// The type key to use to retrieve the <see cref="IActorRef"/> for this <see cref="ShardRegion"/>.