diff --git a/Projects/Server.Tests/Tests/Network/Packets/Outgoing/AccountPacketTests.cs b/Projects/Server.Tests/Tests/Network/Packets/Outgoing/AccountPacketTests.cs index 0ad93a512..ed3bfd4e3 100644 --- a/Projects/Server.Tests/Tests/Network/Packets/Outgoing/AccountPacketTests.cs +++ b/Projects/Server.Tests/Tests/Network/Packets/Outgoing/AccountPacketTests.cs @@ -51,6 +51,10 @@ public Mobile this[int index] public Serial Serial { get; } public void Deserialize(IGenericReader reader) => throw new NotImplementedException(); + public byte SerializedThread { get; set; } + public int SerializedPosition { get; set; } + public int SerializedLength { get; set; } + public void Serialize(IGenericWriter writer) => throw new NotImplementedException(); public bool Deleted { get; } diff --git a/Projects/Server/Guild.cs b/Projects/Server/Guild.cs index ab11330c3..8998860cf 100644 --- a/Projects/Server/Guild.cs +++ b/Projects/Server/Guild.cs @@ -52,6 +52,10 @@ protected BaseGuild() [CommandProperty(AccessLevel.GameMaster, readOnly: true)] public DateTime Created { get; set; } = Core.Now; + public byte SerializedThread { get; set; } + public int SerializedPosition { get; set; } + public int SerializedLength { get; set; } + public abstract void Serialize(IGenericWriter writer); public abstract void Deserialize(IGenericReader reader); diff --git a/Projects/Server/IEntity.cs b/Projects/Server/IEntity.cs index 4ebc26562..95b11843b 100644 --- a/Projects/Server/IEntity.cs +++ b/Projects/Server/IEntity.cs @@ -115,6 +115,10 @@ public void Deserialize(IGenericReader reader) Timer.StartTimer(Delete); } + public byte SerializedThread { get; set; } + public int SerializedPosition { get; set; } + public int SerializedLength { get; set; } + public void Serialize(IGenericWriter writer) { } diff --git a/Projects/Server/Items/Item.cs b/Projects/Server/Items/Item.cs index 368991f2a..aeab60811 100644 --- a/Projects/Server/Items/Item.cs +++ b/Projects/Server/Items/Item.cs @@ -802,6 +802,10 @@ public virtual void GetProperties(IPropertyList list) [CommandProperty(AccessLevel.Counselor)] public Serial Serial { get; } + public byte SerializedThread { get; set; } + public int SerializedPosition { get; set; } + public int SerializedLength { get; set; } + public virtual void Serialize(IGenericWriter writer) { writer.Write(9); // version diff --git a/Projects/Server/Mobiles/Mobile.cs b/Projects/Server/Mobiles/Mobile.cs index 8b5fe2a64..d6b9eb9a2 100644 --- a/Projects/Server/Mobiles/Mobile.cs +++ b/Projects/Server/Mobiles/Mobile.cs @@ -2277,6 +2277,10 @@ public virtual void GetProperties(IPropertyList list) [CommandProperty(AccessLevel.Counselor)] public Serial Serial { get; } + public byte SerializedThread { get; set; } + public int SerializedPosition { get; set; } + public int SerializedLength { get; set; } + public virtual void Serialize(IGenericWriter writer) { writer.Write(36); // version diff --git a/Projects/Server/Serialization/AdhocPersistence.cs b/Projects/Server/Serialization/AdhocPersistence.cs index 332f3a83c..b60e2482a 100644 --- a/Projects/Server/Serialization/AdhocPersistence.cs +++ b/Projects/Server/Serialization/AdhocPersistence.cs @@ -55,8 +55,8 @@ public static void SerializeAndSnapshot( { var fullPath = PathUtility.GetFullPath(filePath, Core.BaseDirectory); PathUtility.EnsureDirectory(Path.GetDirectoryName(fullPath)); - ConcurrentQueue types = []; - var writer = new MemoryMapFileWriter(new FileStream(filePath, FileMode.Create), sizeHint, types); + HashSet typesSet = []; + var writer = new MemoryMapFileWriter(new FileStream(filePath, FileMode.Create), sizeHint, typesSet); serializer(writer); Task.Run( @@ -67,14 +67,6 @@ public static void SerializeAndSnapshot( writer.Dispose(); fs.Dispose(); - HashSet typesSet = []; - - // Dedupe the queue. - foreach (var type in types) - { - typesSet.Add(type); - } - Persistence.WriteSerializedTypesSnapshot(Path.GetDirectoryName(fullPath), typesSet); }, Core.ClosingTokenSource.Token diff --git a/Projects/Server/Serialization/BinaryFileReader.cs b/Projects/Server/Serialization/BinaryFileReader.cs new file mode 100644 index 000000000..7a6ac2fde --- /dev/null +++ b/Projects/Server/Serialization/BinaryFileReader.cs @@ -0,0 +1,109 @@ +/************************************************************************* + * ModernUO * + * Copyright 2019-2024 - ModernUO Development Team * + * Email: hi@modernuo.com * + * File: BinaryFileReader.cs * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation, either version 3 of the License, or * + * (at your option) any later version. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program. If not, see . * + *************************************************************************/ + +using System; +using System.IO; +using System.IO.MemoryMappedFiles; +using System.Runtime.CompilerServices; +using System.Text; + +namespace Server; + +public sealed unsafe class BinaryFileReader : IDisposable, IGenericReader +{ + private readonly bool _usePrefixes; + private readonly MemoryMappedFile _mmf; + private readonly MemoryMappedViewStream _accessor; + private readonly UnmanagedDataReader _reader; + + public BinaryFileReader(string path, bool usePrefixes = true, Encoding encoding = null) + { + _usePrefixes = usePrefixes; + var fi = new FileInfo(path); + + if (fi.Length > 0) + { + _mmf = MemoryMappedFile.CreateFromFile(path, FileMode.Open); + _accessor = _mmf.CreateViewStream(); + byte* ptr = null; + _accessor.SafeMemoryMappedViewHandle.AcquirePointer(ref ptr); + _reader = new UnmanagedDataReader(ptr, _accessor.Length, encoding: encoding); + } + else + { + _reader = new UnmanagedDataReader(null, 0, encoding: encoding); + } + } + + public long Position => _reader.Position; + + public void Dispose() + { + _accessor?.SafeMemoryMappedViewHandle.ReleasePointer(); + _accessor?.Dispose(); + _mmf?.Dispose(); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public string ReadString(bool intern = false) => _usePrefixes ? _reader.ReadString(intern) : _reader.ReadStringRaw(intern); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public string ReadStringRaw(bool intern = false) => _reader.ReadStringRaw(intern); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public long ReadLong() => _reader.ReadLong(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ulong ReadULong() => _reader.ReadULong(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public int ReadInt() => _reader.ReadInt(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public uint ReadUInt() => _reader.ReadUInt(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public short ReadShort() => _reader.ReadShort(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ushort ReadUShort() => _reader.ReadUShort(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public double ReadDouble() => _reader.ReadDouble(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public float ReadFloat() => _reader.ReadFloat(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public byte ReadByte() => _reader.ReadByte(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public sbyte ReadSByte() => _reader.ReadSByte(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool ReadBool() => _reader.ReadBool(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Serial ReadSerial() => _reader.ReadSerial(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Type ReadType() => _reader.ReadType(); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public int Read(Span buffer) => _reader.Read(buffer); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public long Seek(long offset, SeekOrigin origin) => _reader.Seek(offset, origin); +} diff --git a/Projects/Server/Serialization/BufferWriter.cs b/Projects/Server/Serialization/BufferWriter.cs index 2d425ba1b..54eb0f834 100644 --- a/Projects/Server/Serialization/BufferWriter.cs +++ b/Projects/Server/Serialization/BufferWriter.cs @@ -110,7 +110,7 @@ public virtual void Flush() // Need to avoid buffer.Length = 2, buffer * 2 is 4, but we need 8 or 16bytes, causing an exception. // The least we need is 16bytes + Index, but we use BufferSize since it should always be big enough for a single // non-dynamic field. - Resize(Math.Max(BufferSize, _buffer.Length * 2)); + Resize(Math.Clamp(_buffer.Length * 2, BufferSize, 1024 * 1024 * 64)); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/Projects/Server/Serialization/GenericEntityPersistence.cs b/Projects/Server/Serialization/GenericEntityPersistence.cs index 28a9696da..0a8331180 100644 --- a/Projects/Server/Serialization/GenericEntityPersistence.cs +++ b/Projects/Server/Serialization/GenericEntityPersistence.cs @@ -14,7 +14,6 @@ *************************************************************************/ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -35,137 +34,82 @@ public interface IGenericEntityPersistence public class GenericEntityPersistence : Persistence, IGenericEntityPersistence where T : class, ISerializable { private static readonly ILogger logger = LogFactory.GetLogger(typeof(GenericEntityPersistence)); - private static List>[] _entities; - private long _initialIdxSize = 1024 * 256; - private long _initialBinSize = 1024 * 1024; + // Support legacy split file serialization + private static Dictionary>> _entities; + private readonly string _name; - private readonly uint _minSerial; - private readonly uint _maxSerial; + private readonly Serial _minSerial; + private readonly Serial _maxSerial; private Serial _lastEntitySerial; private readonly Dictionary _pendingAdd = new(); private readonly Dictionary _pendingDelete = new(); - private readonly uint[] _entitiesCount = new uint[World.GetThreadWorkerCount()]; - - private readonly (MemoryMapFileWriter idxWriter, MemoryMapFileWriter binWriter)[] _writers = - new (MemoryMapFileWriter, MemoryMapFileWriter)[World.GetThreadWorkerCount()]; - public Dictionary EntitiesBySerial { get; } = new(); - public GenericEntityPersistence(string name, int priority, uint minSerial, uint maxSerial) : base(priority) + public GenericEntityPersistence(string name, int priority, uint minSerial, uint maxSerial) : this( + name, + priority, + (Serial)minSerial, + (Serial)maxSerial + ) + { + } + + public GenericEntityPersistence(string name, int priority, Serial minSerial, Serial maxSerial) : base(priority) { _name = name; _minSerial = minSerial; _maxSerial = maxSerial; - _lastEntitySerial = (Serial)(minSerial - 1); + _lastEntitySerial = minSerial - 1; typeof(T).RegisterFindEntity(Find); } - public override void Preserialize(string savePath, ConcurrentQueue types) - { - var path = Path.Combine(savePath, _name); - PathUtility.EnsureDirectory(path); - - var threadCount = World.GetThreadWorkerCount(); - for (var i = 0; i < threadCount; i++) - { - var idxPath = Path.Combine(path, $"{_name}_{i}.idx"); - var binPath = Path.Combine(path, $"{_name}_{i}.bin"); - - _writers[i] = ( - new MemoryMapFileWriter(new FileStream(idxPath, FileMode.Create), _initialIdxSize, types), - new MemoryMapFileWriter(new FileStream(binPath, FileMode.Create), _initialBinSize, types) - ); - - _writers[i].idxWriter.Write(3); // version - _writers[i].idxWriter.Seek(4, SeekOrigin.Current); // Entity count - - _entitiesCount[i] = 0; - } - } - - public override void Serialize(IGenericSerializable e, int threadIndex) + public override void WriteSnapshot(string savePath, HashSet typeSet) { - var (idx, bin) = _writers[threadIndex]; - var pos = bin.Position; + var dir = Path.Combine(savePath, _name); + PathUtility.EnsureDirectory(dir); - var entity = (ISerializable)e; + var threads = World._threadWorkers; - entity.Serialize(bin); - var length = (uint)(bin.Position - pos); + using var binFs = new FileStream(Path.Combine(dir, $"{_name}.bin"), FileMode.Create, FileAccess.Write, FileShare.None); + using var idxFs = new FileStream(Path.Combine(dir, $"{_name}.idx"), FileMode.Create); + using var idx = new MemoryMapFileWriter(idxFs, 1024 * 1024, typeSet); // 1MB - var t = entity.GetType(); - idx.Write(t); - idx.Write(entity.Serial); - idx.Write(entity.Created.Ticks); - idx.Write(pos); - idx.Write(length); + idx.Write(3); // Version + idx.Write(EntitiesBySerial.Values.Count); + var binPosition = 0L; - _entitiesCount[threadIndex]++; - } - - public override void WriteSnapshot() - { - var wroteFile = false; - string folderPath = null; - for (int i = 0; i < _writers.Length; i++) + foreach (var e in EntitiesBySerial.Values) { - var (idxWriter, binWriter) = _writers[i]; - - var binBytesWritten = binWriter.Position; + var thread = e.SerializedThread; + var heapStart = e.SerializedPosition; + var heapLength = e.SerializedLength; - // Write the entity count - var pos = idxWriter.Position; - idxWriter.Seek(4, SeekOrigin.Begin); - idxWriter.Write(_entitiesCount[i]); - idxWriter.Seek(pos, SeekOrigin.Begin); - - var idxFs = idxWriter.FileStream; - var idxFilePath = idxFs.Name; - var binFs = binWriter.FileStream; - var binFilePath = binFs.Name; - - if (_initialIdxSize < idxFs.Position) - { - _initialIdxSize = idxFs.Position; - } + idx.Write(e.GetType()); + idx.Write(e.Serial); + idx.Write(e.Created.Ticks); + idx.Write(binPosition); + idx.Write(heapLength); - if (_initialBinSize < binFs.Position) - { - _initialBinSize = binFs.Position; - } - - idxWriter.Dispose(); - binWriter.Dispose(); - - idxFs.Dispose(); - binFs.Dispose(); - - if (binBytesWritten > 1) + try { - wroteFile = true; + binFs.Write(threads[thread].GetHeap(heapStart, heapLength)); } - else + catch (Exception error) { - File.Delete(idxFilePath); - File.Delete(binFilePath); - folderPath = Path.GetDirectoryName(idxFilePath); + Console.WriteLine("Error writing entity: {0} (Thread: {1} - {2} {3})\n{4}", e, thread, heapStart, heapLength, error); } - } - if (!wroteFile && folderPath != null) - { - Directory.Delete(folderPath); + binPosition += heapLength; } } public override void Serialize() { - World.ResetRoundRobin(); foreach (var entity in EntitiesBySerial.Values) { - World.PushToCache((entity, this)); + World.PushToCache(entity); } } @@ -237,19 +181,22 @@ private unsafe Dictionary ReadTypes(string savePath) public virtual void DeserializeIndexes(string savePath, Dictionary typesDb) { string indexPath = Path.Combine(savePath, _name, $"{_name}.idx"); + + _entities ??= []; + + // Support for legacy MUO Serialization that used split files if (!File.Exists(indexPath)) { - TryDeserializeMultithreadIndexes(savePath, typesDb); + TryDeserializeSplitFileIndexes(savePath, typesDb); return; } - _entities = [InternalDeserializeIndexes(indexPath, typesDb)]; + InternalDeserializeIndexes(indexPath, typesDb, _entities[0] = []); } - private void TryDeserializeMultithreadIndexes(string savePath, Dictionary typesDb) + private void TryDeserializeSplitFileIndexes(string savePath, Dictionary typesDb) { var index = 0; - var fileList = new List(); while (true) { var path = Path.Combine(savePath, _name, $"{_name}_{index}.idx"); @@ -259,36 +206,31 @@ private void TryDeserializeMultithreadIndexes(string savePath, Dictionary>[fileList.Count]; - for (var i = 0; i < fileList.Count; i++) - { - _entities[i] = InternalDeserializeIndexes(fileList[i], typesDb); - } } - private unsafe List> InternalDeserializeIndexes(string filePath, Dictionary typesDb) + private unsafe void InternalDeserializeIndexes( + string filePath, Dictionary typesDb, List> entities + ) { - object[] ctorArgs = new object[1]; - List> entities = []; - using var mmf = MemoryMappedFile.CreateFromFile(filePath, FileMode.Open); using var accessor = mmf.CreateViewStream(); byte* ptr = null; accessor.SafeMemoryMappedViewHandle.AcquirePointer(ref ptr); - UnmanagedDataReader dataReader = new UnmanagedDataReader(ptr, accessor.Length, typesDb); + var dataReader = new UnmanagedDataReader(ptr, accessor.Length); var version = dataReader.ReadInt(); - Dictionary ctors = null; + Dictionary ctors = []; + if (version < 2) { ctors = ReadTypes(Path.GetDirectoryName(filePath)); @@ -296,15 +238,16 @@ private unsafe List> InternalDeserializeIndexes(string filePath, D if (typesDb == null && ctors == null) { - return entities; + return; } - int count = dataReader.ReadInt(); - var now = DateTime.UtcNow; + var ctorArgs = new object[1]; Type[] ctorArguments = [typeof(Serial)]; - for (int i = 0; i < count; ++i) + var count = dataReader.ReadInt(); + + for (var i = 0; i < count; ++i) { ConstructorInfo ctor; // Version 2 & 3 with SerializedTypes.db @@ -331,6 +274,7 @@ private unsafe List> InternalDeserializeIndexes(string filePath, D { dataReader.ReadLong(); // LastSerialized } + var pos = dataReader.ReadLong(); var length = dataReader.ReadInt(); @@ -344,20 +288,17 @@ private unsafe List> InternalDeserializeIndexes(string filePath, D if (ctor.Invoke(ctorArgs) is T entity) { entity.Created = created; - entities.Add(new EntitySpan(entity, pos, (int)length)); + entities.Add(new EntitySpan(entity, pos, length)); EntitiesBySerial[serial] = entity; } } accessor.SafeMemoryMappedViewHandle.ReleasePointer(); - entities.TrimExcess(); if (EntitiesBySerial.Count > 0) { _lastEntitySerial = EntitiesBySerial.Keys.Max(); } - - return entities; } public override void Deserialize(string savePath, Dictionary typesDb) @@ -369,16 +310,13 @@ public override void Deserialize(string savePath, Dictionary type { TryDeserializeMultithread(savePath, typesDb); } - else + else if (fi.Length > 0) { - if (fi.Length == 0) - { - return; - } - InternalDeserialize(dataPath, 0, typesDb); } + _entities.Clear(); + _entities.TrimExcess(); _entities = null; } @@ -462,7 +400,7 @@ private void TryDeserializeMultithread(string savePath, Dictionary max) { - last = (Serial)_minSerial; + last = min; } - if (FindEntity(last) == null) + if (FindEntity((Serial)last) == null) { - return _lastEntitySerial = last; + return _lastEntitySerial = (Serial)last; } } @@ -530,6 +469,7 @@ public void AddEntity(T entity) goto case WorldState.Loading; } case WorldState.Loading: + case WorldState.WritingSave: { if (_pendingDelete.Remove(entity.Serial)) { @@ -540,7 +480,6 @@ public void AddEntity(T entity) break; } case WorldState.PendingSave: - case WorldState.WritingSave: case WorldState.Running: { ref var entityEntry = ref CollectionsMarshal.GetValueRefOrAddDefault(EntitiesBySerial, entity.Serial, out bool exists); @@ -591,13 +530,13 @@ public void RemoveEntity(T entity) goto case WorldState.Loading; } case WorldState.Loading: + case WorldState.WritingSave: { _pendingAdd.Remove(entity.Serial); _pendingDelete[entity.Serial] = entity; break; } case WorldState.PendingSave: - case WorldState.WritingSave: case WorldState.Running: { EntitiesBySerial.Remove(entity.Serial); @@ -669,6 +608,7 @@ public R FindEntity(Serial serial, bool returnDeleted, bool returnPending) wh } case WorldState.Loading: case WorldState.Saving: + case WorldState.WritingSave: { if (returnDeleted && returnPending && _pendingDelete.TryGetValue(serial, out var entity)) { @@ -684,7 +624,6 @@ public R FindEntity(Serial serial, bool returnDeleted, bool returnPending) wh return null; } case WorldState.PendingSave: - case WorldState.WritingSave: case WorldState.Running: { return EntitiesBySerial.TryGetValue(serial, out var entity) ? entity as R : null; diff --git a/Projects/Server/Serialization/GenericPersistence.cs b/Projects/Server/Serialization/GenericPersistence.cs index a584b76a3..a89e19141 100644 --- a/Projects/Server/Serialization/GenericPersistence.cs +++ b/Projects/Server/Serialization/GenericPersistence.cs @@ -14,65 +14,105 @@ *************************************************************************/ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; +using System.IO.MemoryMappedFiles; namespace Server; public abstract class GenericPersistence : Persistence, IGenericSerializable { - private long _initialSize = 1024 * 1024; - private MemoryMapFileWriter _fileToSave; - public string Name { get; } + public string SaveFilePath { get; protected set; } // "/.bin" - public GenericPersistence(string name, int priority) : base(priority) => Name = name; + public byte SerializedThread { get; set; } + public int SerializedPosition { get; set; } + public int SerializedLength { get; set; } - public override void Preserialize(string savePath, ConcurrentQueue types) + public GenericPersistence(string name, int priority) : base(priority) { - var path = Path.Combine(savePath, Name); - var filePath = Path.Combine(path, $"{Name}.bin"); - PathUtility.EnsureDirectory(path); - - _fileToSave = new MemoryMapFileWriter(new FileStream(filePath, FileMode.Create), _initialSize, types); + Name = name; + SaveFilePath = Path.Combine(Name, $"{Name}.bin"); } public override void Serialize() { - World.ResetRoundRobin(); - World.PushToCache((this, this)); + World.PushToCache(this); } - public override void WriteSnapshot() + public override void WriteSnapshot(string savePath, HashSet typeSet) { - string folderPath = null; - using (var fs = _fileToSave.FileStream) + if (SerializedLength == 0) { - if (fs.Position > _initialSize) - { - _initialSize = fs.Position; - } + return; + } - _fileToSave.Dispose(); - if (_fileToSave.Position == 0) - { - folderPath = Path.GetDirectoryName(fs.Name); - } + var file = Path.Combine(savePath, SaveFilePath); + var dir = Path.GetDirectoryName(file); + PathUtility.EnsureDirectory(dir); + + var threads = World._threadWorkers; + + using var binFs = new FileStream(file, FileMode.Create, FileAccess.Write, FileShare.None); + + var thread = SerializedThread; + var heapStart = SerializedPosition; + var heapLength = SerializedLength; + + binFs.Write(threads[thread].GetHeap(heapStart, heapLength)); + } + + public override unsafe void Deserialize(string savePath, Dictionary typesDb) + { + // Assume savePath has the Core.BaseDirectory already prepended + var dataPath = Path.GetFullPath(SaveFilePath, savePath); + var file = new FileInfo(dataPath); + + if (!file.Exists || file.Length <= 0) + { + return; } - if (folderPath != null) + var fileLength = file.Length; + + string error; + + try + { + using var mmf = MemoryMappedFile.CreateFromFile(dataPath, FileMode.Open); + using var accessor = mmf.CreateViewStream(); + + byte* ptr = null; + accessor.SafeMemoryMappedViewHandle.AcquirePointer(ref ptr); + var dataReader = new UnmanagedDataReader(ptr, accessor.Length, typesDb); + Deserialize(dataReader); + + error = dataReader.Position != fileLength + ? $"Serialized {fileLength} bytes, but {dataReader.Position} bytes deserialized" + : null; + + accessor.SafeMemoryMappedViewHandle.ReleasePointer(); + } + catch (Exception e) { - Directory.Delete(folderPath); + error = e.ToString(); } - } - public override void Serialize(IGenericSerializable e, int threadIndex) => Serialize(_fileToSave); + if (error != null) + { + Console.WriteLine($"***** Bad deserialize of {file.FullName} *****"); + Console.WriteLine(error); - public abstract void Serialize(IGenericWriter writer); + Console.Write("Skip this file and continue? (y/n): "); + var y = Console.ReadLine(); - public override void Deserialize(string savePath, Dictionary typesDb) => - AdhocPersistence.Deserialize(Path.Combine(savePath, Name, $"{Name}.bin"), Deserialize); + if (!y.InsensitiveEquals("y")) + { + throw new Exception("Deserialization failed."); + } + } + } + public abstract void Serialize(IGenericWriter writer); public abstract void Deserialize(IGenericReader reader); } diff --git a/Projects/Server/World/IGenericSerializable.cs b/Projects/Server/Serialization/IGenericSerializable.cs similarity index 90% rename from Projects/Server/World/IGenericSerializable.cs rename to Projects/Server/Serialization/IGenericSerializable.cs index 02e6487af..ecc54042d 100644 --- a/Projects/Server/World/IGenericSerializable.cs +++ b/Projects/Server/Serialization/IGenericSerializable.cs @@ -17,5 +17,9 @@ namespace Server; public interface IGenericSerializable { + byte SerializedThread { get; set; } + int SerializedPosition { get; set; } + int SerializedLength { get; set; } + void Serialize(IGenericWriter writer); } diff --git a/Projects/Server/Serialization/MemoryMapFileWriter.cs b/Projects/Server/Serialization/MemoryMapFileWriter.cs index e5de0849f..67de0a4ab 100644 --- a/Projects/Server/Serialization/MemoryMapFileWriter.cs +++ b/Projects/Server/Serialization/MemoryMapFileWriter.cs @@ -15,7 +15,7 @@ using System; using System.Buffers.Binary; -using System.Collections.Concurrent; +using System.Collections.Generic; using System.IO; using System.IO.MemoryMappedFiles; using System.Runtime.CompilerServices; @@ -29,7 +29,7 @@ public unsafe class MemoryMapFileWriter : IGenericWriter, IDisposable { private readonly Encoding _encoding; - private readonly ConcurrentQueue _types; + private readonly HashSet _types; private readonly FileStream _fileStream; private MemoryMappedFile _mmf; private MemoryMappedViewAccessor _accessor; @@ -37,7 +37,7 @@ public unsafe class MemoryMapFileWriter : IGenericWriter, IDisposable private long _position; private long _size; - public MemoryMapFileWriter(FileStream fileStream, long initialSize, ConcurrentQueue types = null) + public MemoryMapFileWriter(FileStream fileStream, long initialSize, HashSet types = null) { _types = types; _fileStream = fileStream; @@ -244,7 +244,7 @@ public void Write(Type type) { Write((byte)0x2); // xxHash3 64bit Write(AssemblyHandler.GetTypeHash(type)); - _types.Enqueue(type); + _types.Add(type); } } diff --git a/Projects/Server/Serialization/Persistence.cs b/Projects/Server/Serialization/Persistence.cs index d98f6551c..3c41c564e 100644 --- a/Projects/Server/Serialization/Persistence.cs +++ b/Projects/Server/Serialization/Persistence.cs @@ -14,7 +14,6 @@ *************************************************************************/ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.IO.MemoryMappedFiles; @@ -53,7 +52,7 @@ public static void Load(string path) } } - private unsafe static Dictionary LoadTypes(string path) + private static unsafe Dictionary LoadTypes(string path) { var db = new Dictionary(); @@ -85,32 +84,31 @@ private unsafe static Dictionary LoadTypes(string path) } // Note: This is strictly on a background thread - internal static void PreSerializeAll(string path, ConcurrentQueue types) + internal static void WriteSnapshotAll(string path, HashSet typeSet) { foreach (var p in _registry) { - p.Preserialize(path, types); + p.WriteSnapshot(path, typeSet); } - } - private static readonly HashSet _typesSet = []; + WriteSerializedTypesSnapshot(path, typeSet); + } - // Note: This is strictly on a background thread - internal static void WriteSnapshotAll(string path, ConcurrentQueue types) + public static void WriteSerializedTypesSnapshot(string path, HashSet types) { - foreach (var p in _registry) - { - p.WriteSnapshot(); - } + string typesPath = Path.Combine(path, "SerializedTypes.db"); + using var fs = new FileStream(typesPath, FileMode.Create); + using var writer = new MemoryMapFileWriter(fs, 1024 * 1024 * 4); + + writer.Write(0); // version + writer.Write(types.Count); - // Dedupe the queue. foreach (var type in types) { - _typesSet.Add(type); + var fullName = type.FullName; + writer.Write(HashUtility.ComputeHash64(fullName)); + writer.WriteStringRaw(fullName); } - - WriteSerializedTypesSnapshot(path, _typesSet); - _typesSet.Clear(); } internal static void SerializeAll() @@ -137,32 +135,8 @@ internal static void PostDeserializeAll() } } - public static void WriteSerializedTypesSnapshot(string path, HashSet types) - { - string typesPath = Path.Combine(path, "SerializedTypes.db"); - using var fs = new FileStream(typesPath, FileMode.Create); - using var writer = new MemoryMapFileWriter(fs, 1024 * 1024 * 4); - - writer.Write(0); // version - writer.Write(types.Count); - - foreach (var type in types) - { - var fullName = type.FullName; - writer.Write(HashUtility.ComputeHash64(fullName)); - writer.WriteStringRaw(fullName); - } - } - - // Open file streams, MMFs, prepare data structures - // Note: This should only be run on a background thread - public abstract void Preserialize(string savePath, ConcurrentQueue types); - - // Note: This should only be run on a background thread - public abstract void Serialize(IGenericSerializable e, int threadIndex); - // Note: This should only be run on a background thread - public abstract void WriteSnapshot(); + public abstract void WriteSnapshot(string savePath, HashSet typeSet); public abstract void Serialize(); diff --git a/Projects/Server/Serialization/SerializationThreadWorker.cs b/Projects/Server/Serialization/SerializationThreadWorker.cs new file mode 100644 index 000000000..3d7430b2c --- /dev/null +++ b/Projects/Server/Serialization/SerializationThreadWorker.cs @@ -0,0 +1,125 @@ +/************************************************************************* + * ModernUO * + * Copyright 2019-2024 - ModernUO Development Team * + * Email: hi@modernuo.com * + * File: SerializationThreadWorker.cs * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation, either version 3 of the License, or * + * (at your option) any later version. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program. If not, see . * + *************************************************************************/ + +using System; +using System.Collections.Concurrent; +using System.Runtime.CompilerServices; +using System.Threading; + +namespace Server; + +public class SerializationThreadWorker +{ + private readonly int _index; + private readonly Thread _thread; + private readonly AutoResetEvent _startEvent; // Main thread tells the thread to start working + private readonly AutoResetEvent _stopEvent; // Main thread waits for the worker finish draining + private bool _pause; + private bool _exit; + private byte[] _heap; + + private readonly ConcurrentQueue _entities; + + public SerializationThreadWorker(int index) + { + _index = index; + _startEvent = new AutoResetEvent(false); + _stopEvent = new AutoResetEvent(false); + _entities = new ConcurrentQueue(); + _thread = new Thread(Execute); + _thread.Start(this); + } + + public void Wake() + { + _startEvent.Set(); + } + + public void Sleep() + { + Volatile.Write(ref _pause, true); + _stopEvent.WaitOne(); + } + + public void Exit() + { + _exit = true; + Wake(); + Sleep(); + } + + // 6GB heap, divide by number of threads from World.GetThreadWorkerCount() + private const ulong totalMemory = 1024 * 1024; + // private static readonly ulong _memoryPerThread = totalMemory / (ulong)World.GetThreadWorkerCount(); + + // private static ulong NextBlockSize(ulong amount) => (amount + 4096UL - 1) & ~(4096UL - 1); + + // public void AllocateHeap() => _heap ??= GC.AllocateUninitializedArray((int)NextBlockSize(_memoryPerThread)); + + public void AllocateHeap() => _heap ??= GC.AllocateUninitializedArray((int)totalMemory); + + public void DeallocateHeap() + { + _heap = null; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void Push(IGenericSerializable entity) => _entities.Enqueue(entity); + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public ReadOnlySpan GetHeap(int start, int length) => _heap.AsSpan(start, length); + + private static void Execute(object obj) + { + var worker = (SerializationThreadWorker)obj; + var threadIndex = (byte)worker._index; + + var queue = worker._entities; + var serializedTypes = World.SerializedTypes; + + while (worker._startEvent.WaitOne()) + { + var writer = new BufferWriter(worker._heap, true, serializedTypes); + + while (true) + { + var pauseRequested = Volatile.Read(ref worker._pause); + if (queue.TryDequeue(out var e)) + { + e.SerializedThread = threadIndex; + var start = e.SerializedPosition = (int)writer.Position; + e.Serialize(writer); + e.SerializedLength = (int)(writer.Position - start); + } + else if (pauseRequested) // Break when finished + { + break; + } + } + + worker._heap = writer.Buffer; + + writer.Close(); + + worker._stopEvent.Set(); // Allow the main thread to continue now that we are finished + worker._pause = false; + + if (Core.Closing || worker._exit) + { + return; + } + } + } +} diff --git a/Projects/Server/World/World.cs b/Projects/Server/World/World.cs index 32659f751..1b76a5e71 100644 --- a/Projects/Server/World/World.cs +++ b/Projects/Server/World/World.cs @@ -45,7 +45,7 @@ public static class World private static readonly GenericEntityPersistence _guildPersistence = new("Guilds", 3, 1, 0x7FFFFFFF); private static int _threadId; - private static readonly SerializationThreadWorker[] _threadWorkers = new SerializationThreadWorker[Math.Max(Environment.ProcessorCount - 1, 1)]; + internal static readonly SerializationThreadWorker[] _threadWorkers = new SerializationThreadWorker[Math.Max(Environment.ProcessorCount - 1, 1)]; private static readonly ManualResetEvent _diskWriteHandle = new(true); private static readonly ConcurrentQueue _decayQueue = new(); @@ -271,13 +271,19 @@ public static void Save() ThreadPool.QueueUserWorkItem(Preserialize); } - internal static void Preserialize(object state) + private static void Preserialize(object state) { var tempPath = PathUtility.EnsureRandomPath(_tempSavePath); try { - Persistence.PreSerializeAll(tempPath, SerializedTypes); + // Allocate the heaps for the GC + foreach (var worker in _threadWorkers) + { + worker.AllocateHeap(); + } + + WakeSerializationThreads(); Core.RequestSnapshot(tempPath); } catch (Exception ex) @@ -300,6 +306,8 @@ internal static void Snapshot(string snapshotPath) _diskWriteHandle.Reset(); + NetState.FlushAll(); + WorldState = WorldState.Saving; Broadcast(0x35, true, "The world is saving, please wait."); @@ -314,7 +322,6 @@ internal static void Snapshot(string snapshotPath) { _serializationStart = Core.Now; - WakeSerializationThreads(); Persistence.SerializeAll(); PauseSerializationThreads(); EventSink.InvokeWorldSave(); @@ -324,9 +331,8 @@ internal static void Snapshot(string snapshotPath) exception = ex; } - WorldState = WorldState.PendingSave; + WorldState = WorldState.WritingSave; ThreadPool.QueueUserWorkItem(WriteFiles, snapshotPath); - Persistence.PostWorldSaveAll(); // Process safety queues watch.Stop(); if (exception == null) @@ -345,6 +351,8 @@ internal static void Snapshot(string snapshotPath) } } + private static readonly HashSet _typesSet = []; + private static void WriteFiles(object state) { var snapshotPath = (string)state; @@ -352,7 +360,16 @@ private static void WriteFiles(object state) { var watch = Stopwatch.StartNew(); logger.Information("Writing world save snapshot"); - Persistence.WriteSnapshotAll(snapshotPath, SerializedTypes); + + // Dedupe the types + while (SerializedTypes.TryDequeue(out var type)) + { + _typesSet.Add(type); + } + + Persistence.WriteSnapshotAll(snapshotPath, _typesSet); + + _typesSet.Clear(); try { @@ -379,8 +396,15 @@ private static void WriteFiles(object state) // Clear types SerializedTypes.Clear(); - _diskWriteHandle.Set(); + Core.LoopContext.Post(FinishWorldSave); + } + + private static void FinishWorldSave() + { WorldState = WorldState.Running; + _diskWriteHandle.Set(); + + Persistence.PostWorldSaveAll(); // Process decay and safety queues } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -408,9 +432,9 @@ internal static void PauseSerializationThreads() internal static void ResetRoundRobin() => _threadId = 0; [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static void PushToCache((IGenericSerializable e, Persistence p) ep) + internal static void PushToCache(IGenericSerializable e) { - _threadWorkers[_threadId++].Push(ep); + _threadWorkers[_threadId++].Push(e); if (_threadId == _threadWorkers.Length) { _threadId = 0; @@ -520,7 +544,7 @@ public override void Serialize() EnqueueForDecay(item); } - PushToCache((item, this)); + PushToCache(item); } } @@ -551,78 +575,4 @@ public override void PostDeserialize() } } } - - private class SerializationThreadWorker - { - private readonly int _index; - private readonly Thread _thread; - private readonly AutoResetEvent _startEvent; // Main thread tells the thread to start working - private readonly AutoResetEvent _stopEvent; // Main thread waits for the worker finish draining - private bool _pause; - private bool _exit; - private readonly ConcurrentQueue<(IGenericSerializable, Persistence)> _entities; - - public SerializationThreadWorker(int index) - { - _index = index; - _startEvent = new AutoResetEvent(false); - _stopEvent = new AutoResetEvent(false); - _entities = new ConcurrentQueue<(IGenericSerializable, Persistence)>(); - _thread = new Thread(Execute); - _thread.Start(this); - } - - public void Wake() - { - _startEvent.Set(); - } - - public void Sleep() - { - Volatile.Write(ref _pause, true); - _stopEvent.WaitOne(); - } - - public void Exit() - { - _exit = true; - Wake(); - Sleep(); - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Push((IGenericSerializable e, Persistence p) ep) => _entities.Enqueue(ep); - - private static void Execute(object obj) - { - SerializationThreadWorker worker = (SerializationThreadWorker)obj; - - var reader = worker._entities; - - while (worker._startEvent.WaitOne()) - { - while (true) - { - bool pauseRequested = Volatile.Read(ref worker._pause); - if (reader.TryDequeue(out var ep)) - { - var (e, p) = ep; - p.Serialize(e, worker._index); - } - else if (pauseRequested) // Break when finished - { - break; - } - } - - worker._stopEvent.Set(); // Allow the main thread to continue now that we are finished - worker._pause = false; - - if (Core.Closing || worker._exit) - { - return; - } - } - } - } } diff --git a/Projects/UOContent/Accounting/Account.cs b/Projects/UOContent/Accounting/Account.cs index 46c2b0071..ffd0b1bf3 100644 --- a/Projects/UOContent/Accounting/Account.cs +++ b/Projects/UOContent/Accounting/Account.cs @@ -284,6 +284,10 @@ public bool Inactive public Serial Serial { get; set; } + public byte SerializedThread { get; set; } + public int SerializedPosition { get; set; } + public int SerializedLength { get; set; } + [AfterDeserialization(false)] private void AfterDeserialization() { diff --git a/Projects/UOContent/Engines/Bulk Orders/Books/BaseBOBEntry.cs b/Projects/UOContent/Engines/Bulk Orders/Books/BaseBOBEntry.cs index d3bbadb45..163dc3a32 100644 --- a/Projects/UOContent/Engines/Bulk Orders/Books/BaseBOBEntry.cs +++ b/Projects/UOContent/Engines/Bulk Orders/Books/BaseBOBEntry.cs @@ -26,6 +26,10 @@ public abstract partial class BaseBOBEntry : IBOBEntry public Serial Serial { get; } + public byte SerializedThread { get; set; } + public int SerializedPosition { get; set; } + public int SerializedLength { get; set; } + public bool Deleted { get; private set; } public BaseBOBEntry() diff --git a/Projects/UOContent/Engines/Ethics/Core/EthicsEntity.cs b/Projects/UOContent/Engines/Ethics/Core/EthicsEntity.cs index 5ce677cb8..aaf5af3be 100644 --- a/Projects/UOContent/Engines/Ethics/Core/EthicsEntity.cs +++ b/Projects/UOContent/Engines/Ethics/Core/EthicsEntity.cs @@ -16,6 +16,10 @@ public EthicsEntity() public Serial Serial { get; } + public byte SerializedThread { get; set; } + public int SerializedPosition { get; set; } + public int SerializedLength { get; set; } + public bool Deleted { get; private set; } public void Delete()