From 07b1cdecbb04c9fa9861aa1b31894227998a7480 Mon Sep 17 00:00:00 2001 From: Shannon Joyner Date: Mon, 19 Aug 2019 09:37:03 -0700 Subject: [PATCH 1/3] Refactor so shard / non-shard cases can use the same code The shard / non-shard cases in CRA use different API. Ideally, we would have a base class for the common API and have the shard case inherit from the base class + either VertexBase or ShardedVertexBase. However, C# does not support multiple inheritence. To get around this, we move the initialized methods into an non-shard case that calls the original Ambrosia initialize function. This way we can easily add the sharded case. To make this cleaner and potentially easier to test, we move this logic into a different file. --- Ambrosia/Ambrosia/Initialize.cs | 71 ++++++++++++++++++++++++++++++ Ambrosia/Ambrosia/Program.cs | 76 ++++++--------------------------- ImmortalCoordinator/Program.cs | 2 +- 3 files changed, 84 insertions(+), 65 deletions(-) create mode 100644 Ambrosia/Ambrosia/Initialize.cs diff --git a/Ambrosia/Ambrosia/Initialize.cs b/Ambrosia/Ambrosia/Initialize.cs new file mode 100644 index 00000000..4d0f9f33 --- /dev/null +++ b/Ambrosia/Ambrosia/Initialize.cs @@ -0,0 +1,71 @@ +using CRA.ClientLibrary; +using System.IO; +using System.Threading.Tasks; +using System.Xml.Serialization; + +namespace Ambrosia +{ + public class AmbrosiaRuntimeParams + { + public int serviceReceiveFromPort; + public int serviceSendToPort; + public string serviceName; + public string AmbrosiaBinariesLocation; + public string serviceLogPath; + public bool? createService; + public bool pauseAtStart; + public bool persistLogs; + public bool activeActive; + public long logTriggerSizeMB; + public string storageConnectionString; + public long currentVersion; + public long upgradeToVersion; + } + + public static class AmbrosiaRuntimeParms + { + public static bool _looseAttach = false; + } + + public class AmbrosiaNonShardedRuntime : VertexBase + { + private AmbrosiaRuntime Runtime { get; set; } + public AmbrosiaNonShardedRuntime() + { + Runtime = new AmbrosiaRuntime(); + } + + public override async Task InitializeAsync(object param) + { + // Workaround because of parameter type limitation in CRA + AmbrosiaRuntimeParams p = new AmbrosiaRuntimeParams(); + XmlSerializer xmlSerializer = new XmlSerializer(p.GetType()); + using (StringReader textReader = new StringReader((string)param)) + { + p = (AmbrosiaRuntimeParams)xmlSerializer.Deserialize(textReader); + } + + bool sharded = false; + + Runtime.Initialize( + p.serviceReceiveFromPort, + p.serviceSendToPort, + p.serviceName, + p.serviceLogPath, + p.createService, + p.pauseAtStart, + p.persistLogs, + p.activeActive, + p.logTriggerSizeMB, + p.storageConnectionString, + p.currentVersion, + p.upgradeToVersion, + ClientLibrary, + AddAsyncInputEndpoint, + AddAsyncOutputEndpoint + ); + + return; + } + } +} diff --git a/Ambrosia/Ambrosia/Program.cs b/Ambrosia/Ambrosia/Program.cs index d7bd97cb..b0ae5da5 100644 --- a/Ambrosia/Ambrosia/Program.cs +++ b/Ambrosia/Ambrosia/Program.cs @@ -1042,29 +1042,7 @@ public OutputConnectionRecord(AmbrosiaRuntime inAmbrosia) } } - public class AmbrosiaRuntimeParams - { - public int serviceReceiveFromPort; - public int serviceSendToPort; - public string serviceName; - public string AmbrosiaBinariesLocation; - public string serviceLogPath; - public bool? createService; - public bool pauseAtStart; - public bool persistLogs; - public bool activeActive; - public long logTriggerSizeMB; - public string storageConnectionString; - public long currentVersion; - public long upgradeToVersion; - } - - public static class AmbrosiaRuntimeParms - { - public static bool _looseAttach = false; - } - - public class AmbrosiaRuntime : VertexBase + public class AmbrosiaRuntime { #if _WINDOWS [DllImport("Kernel32.dll", CallingConvention = CallingConvention.Winapi)] @@ -3870,38 +3848,6 @@ private void InitializeLogWriterStatics() #endif } - public override async Task InitializeAsync(object param) - { - InitializeLogWriterStatics(); - - // Workaround because of parameter type limitation in CRA - AmbrosiaRuntimeParams p = new AmbrosiaRuntimeParams(); - XmlSerializer xmlSerializer = new XmlSerializer(p.GetType()); - using (StringReader textReader = new StringReader((string)param)) - { - p = (AmbrosiaRuntimeParams)xmlSerializer.Deserialize(textReader); - } - - bool sharded = false; - - Initialize( - p.serviceReceiveFromPort, - p.serviceSendToPort, - p.serviceName, - p.serviceLogPath, - p.createService, - p.pauseAtStart, - p.persistLogs, - p.activeActive, - p.logTriggerSizeMB, - p.storageConnectionString, - p.currentVersion, - p.upgradeToVersion, - sharded - ); - return; - } - internal void RuntimeChecksOnProcessStart() { if (!_createService) @@ -3955,9 +3901,13 @@ public void Initialize(int serviceReceiveFromPort, string storageConnectionString, long currentVersion, long upgradeToVersion, - bool sharded + CRAClientLibrary coral, + Action addInput, + Action addOutput ) { + InitializeLogWriterStatics(); + _runningRepro = false; _currentVersion = currentVersion; _upgradeToVersion = upgradeToVersion; @@ -3979,11 +3929,9 @@ bool sharded _localServiceSendToPort = serviceSendToPort; _serviceName = serviceName; _storageConnectionString = storageConnectionString; - _sharded = sharded; - _coral = ClientLibrary; + _coral = coral; Console.WriteLine("Logs directory: {0}", _serviceLogPath); - if (createService == null) { if (_logWriterStatics.DirectoryExists(RootDirectory())) @@ -3995,10 +3943,10 @@ bool sharded createService = true; } } - AddAsyncInputEndpoint(AmbrosiaDataInputsName, new AmbrosiaInput(this, "data")); - AddAsyncInputEndpoint(AmbrosiaControlInputsName, new AmbrosiaInput(this, "control")); - AddAsyncOutputEndpoint(AmbrosiaDataOutputsName, new AmbrosiaOutput(this, "data")); - AddAsyncOutputEndpoint(AmbrosiaControlOutputsName, new AmbrosiaOutput(this, "control")); + addInput(AmbrosiaDataInputsName, new AmbrosiaInput(this, "data")); + addInput(AmbrosiaControlInputsName, new AmbrosiaInput(this, "control")); + addOutput(AmbrosiaDataOutputsName, new AmbrosiaOutput(this, "data")); + addOutput(AmbrosiaControlOutputsName, new AmbrosiaOutput(this, "control")); _createService = createService.Value; RecoverOrStartAsync().Wait(); } @@ -4087,7 +4035,7 @@ static void Main(string[] args) try { - if (client.DefineVertexAsync(param.AmbrosiaBinariesLocation, () => new AmbrosiaRuntime()).GetAwaiter().GetResult() != CRAErrorCode.Success) + if (client.DefineVertexAsync(param.AmbrosiaBinariesLocation, () => new AmbrosiaNonShardedRuntime()).GetAwaiter().GetResult() != CRAErrorCode.Success) { throw new Exception(); } diff --git a/ImmortalCoordinator/Program.cs b/ImmortalCoordinator/Program.cs index 0e449bfc..0c3f96c5 100644 --- a/ImmortalCoordinator/Program.cs +++ b/ImmortalCoordinator/Program.cs @@ -96,7 +96,7 @@ static void Main(string[] args) dataProvider, descriptor, connectionsPoolPerWorker); worker.DisableDynamicLoading(); - worker.SideloadVertex(new AmbrosiaRuntime(), "ambrosia"); + worker.SideloadVertex(new AmbrosiaNonShardedRuntime(), "ambrosia"); worker.Start(); } From b152b6de946982902256298810beb601d17c2547 Mon Sep 17 00:00:00 2001 From: Shannon Joyner Date: Fri, 6 Dec 2019 15:18:23 -0800 Subject: [PATCH 2/3] Add Sharded Vertex base --- Ambrosia/Ambrosia/Initialize.cs | 42 +++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/Ambrosia/Ambrosia/Initialize.cs b/Ambrosia/Ambrosia/Initialize.cs index 4d0f9f33..e8c1eae1 100644 --- a/Ambrosia/Ambrosia/Initialize.cs +++ b/Ambrosia/Ambrosia/Initialize.cs @@ -68,4 +68,46 @@ public override async Task InitializeAsync(object param) return; } } + + public class AmbrosiaShardedRuntime : ShardedVertexBase + { + private AmbrosiaRuntime Runtime { get; set; } + public AmbrosiaShardedRuntime() + { + Runtime = new AmbrosiaRuntime(); + } + + public override async Task InitializeAsync(int shardId, ShardingInfo shardingInfo, object param) + { + // Workaround because of parameter type limitation in CRA + AmbrosiaRuntimeParams p = new AmbrosiaRuntimeParams(); + XmlSerializer xmlSerializer = new XmlSerializer(p.GetType()); + using (StringReader textReader = new StringReader((string)param)) + { + p = (AmbrosiaRuntimeParams)xmlSerializer.Deserialize(textReader); + } + + bool sharded = true; + + Runtime.Initialize( + p.serviceReceiveFromPort, + p.serviceSendToPort, + p.serviceName, + p.serviceLogPath, + p.createService, + p.pauseAtStart, + p.persistLogs, + p.activeActive, + p.logTriggerSizeMB, + p.storageConnectionString, + p.currentVersion, + p.upgradeToVersion, + ClientLibrary, + AddAsyncInputEndpoint, + AddAsyncOutputEndpoint + ); + + return; + } + } } From 52a2e49d90633794991c5b55b3e1b0c4855064c5 Mon Sep 17 00:00:00 2001 From: Shannon Joyner Date: Fri, 6 Dec 2019 16:05:04 -0800 Subject: [PATCH 3/3] Add parameters and test for shard case It's the same test as AMB_Basic_Test, except with shard params. --- Ambrosia/Ambrosia/Initialize.cs | 28 +++++- Ambrosia/Ambrosia/Program.cs | 82 ++++++++++++++--- .../AmbrosiaTest/Cmp/shardbasictest_AMB1.cmp | 1 + .../AmbrosiaTest/Cmp/shardbasictest_AMB2.cmp | 1 + .../Cmp/shardbasictest_ClientJob.cmp | 9 ++ .../Cmp/shardbasictest_ClientJob_Verify.cmp | 9 ++ .../Cmp/shardbasictest_Server.cmp | 18 ++++ .../Cmp/shardbasictest_Server_Verify.cmp | 18 ++++ .../EndToEndStressIntegration_Test.cs | 87 +++++++++++++++++++ AmbrosiaTest/AmbrosiaTest/Utilities.cs | 67 ++++++++++++-- AmbrosiaTest/AmbrosiaTest/app.config | 2 +- ImmortalCoordinator/Program.cs | 2 + 12 files changed, 305 insertions(+), 19 deletions(-) create mode 100644 AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB1.cmp create mode 100644 AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB2.cmp create mode 100644 AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob.cmp create mode 100644 AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob_Verify.cmp create mode 100644 AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server.cmp create mode 100644 AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server_Verify.cmp diff --git a/Ambrosia/Ambrosia/Initialize.cs b/Ambrosia/Ambrosia/Initialize.cs index e8c1eae1..697c26be 100644 --- a/Ambrosia/Ambrosia/Initialize.cs +++ b/Ambrosia/Ambrosia/Initialize.cs @@ -20,6 +20,9 @@ public class AmbrosiaRuntimeParams public string storageConnectionString; public long currentVersion; public long upgradeToVersion; + public long shardID; + public string oldShards; + public string newShards; } public static class AmbrosiaRuntimeParms @@ -35,6 +38,23 @@ public AmbrosiaNonShardedRuntime() Runtime = new AmbrosiaRuntime(); } + private long[] ParseLongs(string s) + { + if (s.Trim() == "") + { + return new long[] { }; + } + string[] shards = s.Split(','); + long[] ids = new long[shards.Length]; + + for (int i = 0; i < shards.Length; i++) + { + ids[i] = long.Parse(shards[i]); + } + return ids; + + } + public override async Task InitializeAsync(object param) { // Workaround because of parameter type limitation in CRA @@ -62,7 +82,9 @@ public override async Task InitializeAsync(object param) p.upgradeToVersion, ClientLibrary, AddAsyncInputEndpoint, - AddAsyncOutputEndpoint + AddAsyncOutputEndpoint, + ParseLongs(p.oldShards), + ParseLongs(p.newShards) ); return; @@ -104,7 +126,9 @@ public override async Task InitializeAsync(int shardId, ShardingInfo shardingInf p.upgradeToVersion, ClientLibrary, AddAsyncInputEndpoint, - AddAsyncOutputEndpoint + AddAsyncOutputEndpoint, + new long[0], + new long[0] ); return; diff --git a/Ambrosia/Ambrosia/Program.cs b/Ambrosia/Ambrosia/Program.cs index b0ae5da5..b98db39d 100644 --- a/Ambrosia/Ambrosia/Program.cs +++ b/Ambrosia/Ambrosia/Program.cs @@ -2038,6 +2038,8 @@ public async Task FromStreamAsync(Stream stream, string otherProcess, string oth bool _sharded; internal bool _createService; long _shardID; + long[] _oldShards; + long[] _newShards; bool _runningRepro; long _currentVersion; long _upgradeToVersion; @@ -3903,7 +3905,9 @@ public void Initialize(int serviceReceiveFromPort, long upgradeToVersion, CRAClientLibrary coral, Action addInput, - Action addOutput + Action addOutput, + long[] oldShards, + long[] newShards ) { InitializeLogWriterStatics(); @@ -3921,6 +3925,8 @@ Action addOutput { Console.WriteLine("Ready ..."); } + _oldShards = oldShards; + _newShards = newShards; _persistLogs = persistLogs; _activeActive = activeActive; _newLogTriggerSize = logTriggerSizeMB * 1000000; @@ -3992,6 +3998,45 @@ class Program private static long _logTriggerSizeMB = 1000; private static int _currentVersion = 0; private static long _upgradeVersion = -1; + private static long _shardID = -1; + private static string _oldShards = ""; + private static string _newShards = ""; + + private static void DefineVertex(CRAClientLibrary client, string vertexDefinition, bool sharded) + { + Task result; + if (!sharded) + { + result = client.DefineVertexAsync(vertexDefinition, () => new AmbrosiaNonShardedRuntime()); + } else + { + result = client.DefineVertexAsync(vertexDefinition, () => new AmbrosiaShardedRuntime()); + } + + if (result.GetAwaiter().GetResult() != CRAErrorCode.Success) + { + throw new Exception(); + } + } + + private static void InstantiateVertex(CRAClientLibrary client, string instanceName, string vertexName, string vertexDefinition, object vertexParameter, long shardID) + { + CRAErrorCode result; + if (shardID == -1) + { + result = client.InstantiateVertexAsync(instanceName, vertexName, vertexDefinition, vertexParameter).GetAwaiter().GetResult(); + } + else + { + ConcurrentDictionary vertexShards = new ConcurrentDictionary(); + vertexShards[instanceName] = (int)shardID; + result = client.InstantiateShardedVertex(vertexName, vertexDefinition, vertexParameter, vertexShards); + } + if (result != CRAErrorCode.Success) + { + throw new Exception(); + } + } static void Main(string[] args) { @@ -4005,6 +4050,7 @@ static void Main(string[] args) _isTestingUpgrade, _serviceReceiveFromPort, _serviceSendToPort); return; case LocalAmbrosiaRuntimeModes.AddReplica: + case LocalAmbrosiaRuntimeModes.AddShard: case LocalAmbrosiaRuntimeModes.RegisterInstance: if (_runtimeMode == LocalAmbrosiaRuntimeModes.AddReplica) { @@ -4016,6 +4062,7 @@ static void Main(string[] args) client.DisableArtifactUploading(); var replicaName = $"{_instanceName}{_replicaNumber}"; + AmbrosiaRuntimeParams param = new AmbrosiaRuntimeParams(); param.createService = _recoveryMode == AmbrosiaRecoveryModes.A ? (bool?)null @@ -4032,13 +4079,13 @@ static void Main(string[] args) param.serviceLogPath = _serviceLogPath; param.AmbrosiaBinariesLocation = _binariesLocation; param.storageConnectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONN_STRING"); + param.shardID = _shardID; + param.oldShards = _oldShards; + param.newShards = _newShards; try { - if (client.DefineVertexAsync(param.AmbrosiaBinariesLocation, () => new AmbrosiaNonShardedRuntime()).GetAwaiter().GetResult() != CRAErrorCode.Success) - { - throw new Exception(); - } + DefineVertex(client, param.AmbrosiaBinariesLocation, _shardID >= 0); // Workaround because of limitation in parameter serialization in CRA XmlSerializer xmlSerializer = new XmlSerializer(param.GetType()); @@ -4048,11 +4095,7 @@ static void Main(string[] args) xmlSerializer.Serialize(textWriter, param); serializedParams = textWriter.ToString(); } - - if (client.InstantiateVertexAsync(replicaName, param.serviceName, param.AmbrosiaBinariesLocation, serializedParams).GetAwaiter().GetResult() != CRAErrorCode.Success) - { - throw new Exception(); - } + InstantiateVertex(client, replicaName, param.serviceName, param.AmbrosiaBinariesLocation, serializedParams, _shardID); client.AddEndpointAsync(param.serviceName, AmbrosiaRuntime.AmbrosiaDataInputsName, true, true).Wait(); client.AddEndpointAsync(param.serviceName, AmbrosiaRuntime.AmbrosiaDataOutputsName, false, true).Wait(); client.AddEndpointAsync(param.serviceName, AmbrosiaRuntime.AmbrosiaControlInputsName, true, true).Wait(); @@ -4085,6 +4128,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { "rp|receivePort=", "The service receive from port [REQUIRED].", rp => _serviceReceiveFromPort = int.Parse(rp) }, { "sp|sendPort=", "The service send to port. [REQUIRED]", sp => _serviceSendToPort = int.Parse(sp) }, { "l|log=", "The service log path.", l => _serviceLogPath = l }, + {"si|shardID=", "The shard ID of the instance", si => _shardID = long.Parse(si) }, }; var helpOption = new OptionSet @@ -4118,16 +4162,23 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { "tu|testingUpgrade", "Is testing upgrade.", u => _isTestingUpgrade = true }, }); + var addShardOptionSet = new OptionSet + { + {"os|oldShards=", "Comma separated list of shards to recover from [REQUIRED].", os => _oldShards = os }, + {"ns|newShards=", "Comma separated list of new shards being created [REQUIRED].", ns => _newShards = ns } + }.AddMany(registerInstanceOptionSet); + registerInstanceOptionSet = registerInstanceOptionSet.AddMany(helpOption); addReplicaOptionSet = addReplicaOptionSet.AddMany(helpOption); debugInstanceOptionSet = debugInstanceOptionSet.AddMany(helpOption); - + addShardOptionSet = addShardOptionSet.AddMany(helpOption); var runtimeModeToOptionSet = new Dictionary { { LocalAmbrosiaRuntimeModes.RegisterInstance, registerInstanceOptionSet}, { LocalAmbrosiaRuntimeModes.AddReplica, addReplicaOptionSet}, { LocalAmbrosiaRuntimeModes.DebugInstance, debugInstanceOptionSet}, + { LocalAmbrosiaRuntimeModes.AddShard, addShardOptionSet }, }; _runtimeMode = default(LocalAmbrosiaRuntimeModes); @@ -4160,6 +4211,7 @@ public enum LocalAmbrosiaRuntimeModes AddReplica, RegisterInstance, DebugInstance, + AddShard, } public enum AmbrosiaRecoveryModes @@ -4194,6 +4246,14 @@ private static void ValidateOptions(OptionSet options, bool shouldShowHelp) } } + if (_runtimeMode == LocalAmbrosiaRuntimeModes.AddShard) + { + if (_shardID == -1) + { + errorMessage += "Shard ID is required.\n"; + } + } + // handles the case when an upgradeversion is not specified if (_upgradeVersion == -1) { diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB1.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB1.cmp new file mode 100644 index 00000000..3797d308 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB1.cmp @@ -0,0 +1 @@ +The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB2.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB2.cmp new file mode 100644 index 00000000..3797d308 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB2.cmp @@ -0,0 +1 @@ +The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob.cmp new file mode 100644 index 00000000..a0010664 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob.cmp @@ -0,0 +1,9 @@ +Bytes per RPC Throughput (GB/sec) +*X* 32768 0.0538327740000449 +Service Received 1024 MB so far +*X* 16384 0.0709862409498754 +Service Received 2048 MB so far +*X* 8192 0.0695878693925042 +Service Received 3072 MB so far +Bytes received: 3221225472 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob_Verify.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob_Verify.cmp new file mode 100644 index 00000000..57436b68 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob_Verify.cmp @@ -0,0 +1,9 @@ +Bytes per RPC Throughput (GB/sec) +*X* 32768 0.00943044129323776 +Service Received 1024 MB so far +*X* 16384 0.00989352845861985 +Service Received 2048 MB so far +*X* 8192 0.00993638850272688 +Service Received 3072 MB so far +Bytes received: 3221225472 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server.cmp new file mode 100644 index 00000000..29933c01 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server.cmp @@ -0,0 +1,18 @@ +*X* At checkpoint, received 0 messages +*X* becoming primary +*X* Server in Entry Point +*X* I'm healthy after 3000 checks at time:10/24/2018 1:15:13 PM +*X* I'm healthy after 6000 checks at time:10/24/2018 1:15:19 PM +*X* At checkpoint, received 30564 messages +Received 1024 MB so far +*X* I'm healthy after 9000 checks at time:10/24/2018 1:15:25 PM +*X* I'm healthy after 12000 checks at time:10/24/2018 1:15:31 PM +*X* At checkpoint, received 89584 messages +Received 2048 MB so far +*X* I'm healthy after 15000 checks at time:10/24/2018 1:15:37 PM +*X* I'm healthy after 18000 checks at time:10/24/2018 1:15:43 PM +*X* At checkpoint, received 202934 messages +*X* I'm healthy after 21000 checks at time:10/24/2018 1:15:49 PM +Received 3072 MB so far +Bytes received: 3221225472 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server_Verify.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server_Verify.cmp new file mode 100644 index 00000000..a3e33993 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server_Verify.cmp @@ -0,0 +1,18 @@ +*X* Server in Entry Point +*X* I'm healthy after 3000 checks at time:10/24/2018 1:20:10 PM +*X* I'm healthy after 6000 checks at time:10/24/2018 1:20:16 PM +Received 1024 MB so far +*X* I'm healthy after 9000 checks at time:10/24/2018 1:20:22 PM +*X* I'm healthy after 12000 checks at time:10/24/2018 1:20:28 PM +Received 2048 MB so far +*X* I'm healthy after 15000 checks at time:10/24/2018 1:20:34 PM +*X* I'm healthy after 18000 checks at time:10/24/2018 1:20:40 PM +*X* I'm healthy after 21000 checks at time:10/24/2018 1:20:46 PM +*X* I'm healthy after 24000 checks at time:10/24/2018 1:20:52 PM +*X* I'm healthy after 27000 checks at time:10/24/2018 1:20:58 PM +*X* I'm healthy after 30000 checks at time:10/24/2018 1:21:04 PM +Received 3072 MB so far +Bytes received: 3221225472 +DONE +*X* I'm healthy after 33000 checks at time:10/24/2018 1:21:10 PM +*X* I'm healthy after 36000 checks at time:10/24/2018 1:21:16 PM diff --git a/AmbrosiaTest/AmbrosiaTest/EndToEndStressIntegration_Test.cs b/AmbrosiaTest/AmbrosiaTest/EndToEndStressIntegration_Test.cs index a65a3851..b8d6324c 100644 --- a/AmbrosiaTest/AmbrosiaTest/EndToEndStressIntegration_Test.cs +++ b/AmbrosiaTest/AmbrosiaTest/EndToEndStressIntegration_Test.cs @@ -109,6 +109,93 @@ public void AMB_Basic_Test() MyUtils.VerifyAmbrosiaLogFile(testName, Convert.ToInt64(byteSize), true, true,AMB1.AMB_Version); } + [TestMethod] + public void AMB_Shard_Basic_Test() + { + // Test that one shard per server and client works + string testName = "shardbasictest"; + string clientJobName = testName + "clientjob"; + string serverName = testName + "server"; + string ambrosiaLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\"; + string byteSize = "3221225472"; + + Utilities MyUtils = new Utilities(); + + //AMB1 - Job + string logOutputFileName_AMB1 = testName + "_AMB1.log"; + AMB_Settings AMB1 = new AMB_Settings + { + AMB_ServiceName = clientJobName, + AMB_PortAppReceives = "1000", + AMB_PortAMBSends = "1001", + AMB_ServiceLogPath = ambrosiaLogDir, + AMB_CreateService = "A", + AMB_PauseAtStart = "N", + AMB_PersistLogs = "Y", + AMB_NewLogTriggerSize = "1000", + AMB_ActiveActive = "N", + AMB_Version = "0", + AMB_ShardID = "0", + }; + MyUtils.CallAMB(AMB1, logOutputFileName_AMB1, AMB_ModeConsts.RegisterInstance); + + //AMB2 + string logOutputFileName_AMB2 = testName + "_AMB2.log"; + AMB_Settings AMB2 = new AMB_Settings + { + AMB_ServiceName = serverName, + AMB_PortAppReceives = "2000", + AMB_PortAMBSends = "2001", + AMB_ServiceLogPath = ambrosiaLogDir, + AMB_CreateService = "A", + AMB_PauseAtStart = "N", + AMB_PersistLogs = "Y", + AMB_NewLogTriggerSize = "1000", + AMB_ActiveActive = "N", + AMB_Version = "0", + AMB_ShardID = "0", + }; + MyUtils.CallAMB(AMB2, logOutputFileName_AMB2, AMB_ModeConsts.RegisterInstance); + + //ImmCoord1 + string logOutputFileName_ImmCoord1 = testName + "_ImmCoord1.log"; + int ImmCoordProcessID1 = MyUtils.StartImmCoord(clientJobName, 1500, logOutputFileName_ImmCoord1); + + //ImmCoord2 + string logOutputFileName_ImmCoord2 = testName + "_ImmCoord2.log"; + int ImmCoordProcessID2 = MyUtils.StartImmCoord(serverName, 2500, logOutputFileName_ImmCoord2); + + //Client Job Call + string logOutputFileName_ClientJob = testName + "_ClientJob.log"; + int clientJobProcessID = MyUtils.StartPerfClientJob("1001", "1000", clientJobName, serverName, "32768", "3", logOutputFileName_ClientJob); + + //Server Call + string logOutputFileName_Server = testName + "_Server.log"; + int serverProcessID = MyUtils.StartPerfServer("2001", "2000", clientJobName, serverName, logOutputFileName_Server, 1, false); + + //Delay until client is done - also check Server just to make sure + bool pass = MyUtils.WaitForProcessToFinish(logOutputFileName_ClientJob, byteSize, 15, false, testName, true); // number of bytes processed + pass = MyUtils.WaitForProcessToFinish(logOutputFileName_Server, byteSize, 15, false, testName, true); + + // Stop things so file is freed up and can be opened in verify + MyUtils.KillProcess(clientJobProcessID); + MyUtils.KillProcess(serverProcessID); + MyUtils.KillProcess(ImmCoordProcessID1); + MyUtils.KillProcess(ImmCoordProcessID2); + + //Verify AMB + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB1); + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB2); + + // Verify Client + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_ClientJob); + + // Verify Server + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_Server); + + // Verify integrity of Ambrosia logs by replaying + MyUtils.VerifyAmbrosiaLogFile(testName, Convert.ToInt64(byteSize), true, true, AMB1.AMB_Version, shardID: 1); + } //** This test does 5 rounds of messages starting with 64MB and cutting in half each time //** Basically same as the basic test but passing giant message - the difference is in the job.exe call and that is it diff --git a/AmbrosiaTest/AmbrosiaTest/Utilities.cs b/AmbrosiaTest/AmbrosiaTest/Utilities.cs index 9bfe92a2..e06717da 100644 --- a/AmbrosiaTest/AmbrosiaTest/Utilities.cs +++ b/AmbrosiaTest/AmbrosiaTest/Utilities.cs @@ -28,11 +28,13 @@ public class AMB_Settings public string AMB_Version { get; set; } public string AMB_UpgradeToVersion { get; set; } public string AMB_ReplicaNumber { get; set; } - + public string AMB_ShardID { get; set; } + public string AMB_OldShards { get; set; } + public string AMB_NewShards { get; set; } } // These are the different modes of what the AMB is called - public enum AMB_ModeConsts { RegisterInstance, AddReplica, DebugInstance }; + public enum AMB_ModeConsts { RegisterInstance, AddReplica, DebugInstance, AddShard }; public class Utilities { @@ -460,7 +462,7 @@ public void VerifyTestOutputFileToCmpFile(string testOutputLogFile) // // Assumption: Test Output logs are .log and the cmp is the same file name but with .cmp extension //********************************************************************* - public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpFile, bool startWithFirstFile, string CurrentVersion, string optionalNumberOfClient = "", bool asyncTest = false) + public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpFile, bool startWithFirstFile, string CurrentVersion, string optionalNumberOfClient = "", bool asyncTest = false, long shardID = -1) { // Basically doing this for multi client stuff @@ -481,6 +483,11 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF // used to get log file string ambrosiaClientLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\" + testName + "clientjob" + optionalMultiClientStartingPoint + "_" + CurrentVersion; string ambrosiaServerLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\" + testName + "server_" + CurrentVersion; + if (shardID != -1) + { + ambrosiaClientLogDir += "\\" + shardID.ToString(); + ambrosiaServerLogDir += "\\" + shardID.ToString(); + } string startingClientChkPtVersionNumber = "1"; string clientFirstFile = ""; @@ -578,6 +585,10 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF AMB_PortAppReceives = "1000", AMB_PortAMBSends = "1001" }; + if (shardID != -1) + { + AMB1.AMB_ShardID = shardID.ToString(); + } CallAMB(AMB1, logOutputFileName_AMB1, AMB_ModeConsts.DebugInstance); // AMB for Server @@ -592,6 +603,10 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF AMB_PortAppReceives = "2000", AMB_PortAMBSends = "2001" }; + if (shardID != -1) + { + AMB2.AMB_ShardID = shardID.ToString(); + } CallAMB(AMB2, logOutputFileName_AMB2, AMB_ModeConsts.DebugInstance); string logOutputFileName_ClientJob_Verify; @@ -634,7 +649,7 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF } - public int StartImmCoord(string ImmCoordName, int portImmCoordListensAMB, string testOutputLogFile, bool ActiveActive=false, int replicaNum = 9999) + public int StartImmCoord(string ImmCoordName, int portImmCoordListensAMB, string testOutputLogFile, bool ActiveActive=false, int replicaNum = 9999, long shardID = -1) { // Launch the AMB process with these values @@ -646,7 +661,10 @@ public int StartImmCoord(string ImmCoordName, int portImmCoordListensAMB, string } string argString = "-i=" + ImmCoordName + " -p=" + portImmCoordListensAMB.ToString(); - + if (shardID != -1) + { + argString = argString + " -si=" + shardID.ToString(); + } // if Active Active then required to get replicanu if (ActiveActive) @@ -721,6 +739,9 @@ public void CallAMB(AMB_Settings AMBSettings, string testOutputLogFile, AMB_Mode if (AMBSettings.AMB_UpgradeToVersion != null) argString = argString + " -uv=" + AMBSettings.AMB_UpgradeToVersion; + if (AMBSettings.AMB_ShardID != null) + argString = argString + " -si=" + AMBSettings.AMB_ShardID; + // add current version if it exists if (AMBSettings.AMB_Version != null) argString = argString + " -cv=" + AMBSettings.AMB_Version; @@ -788,7 +809,43 @@ public void CallAMB(AMB_Settings AMBSettings, string testOutputLogFile, AMB_Mode // testing upgrade if (AMBSettings.AMB_TestingUpgrade != null && AMBSettings.AMB_TestingUpgrade != "N") argString = argString + " -tu"; + if (AMBSettings.AMB_ShardID != null) + argString = argString + " -si=" + AMBSettings.AMB_ShardID; + break; + + case AMB_ModeConsts.AddShard: + argString = "AddShard " + "-si=" + AMBSettings.AMB_ShardID + " -i=" + AMBSettings.AMB_ServiceName + + " -rp=" + AMBSettings.AMB_PortAppReceives + " -sp=" + AMBSettings.AMB_PortAMBSends + + " -r=" + AMBSettings.AMB_ReplicaNumber + " -os=" + AMBSettings.AMB_OldShards + + " -ns=" + AMBSettings.AMB_NewShards; + + // add Service log path + if (AMBSettings.AMB_ServiceLogPath != null) + argString = argString + " -l=" + AMBSettings.AMB_ServiceLogPath; + + // add pause at start + if (AMBSettings.AMB_PauseAtStart != null && AMBSettings.AMB_PauseAtStart != "N") + argString = argString + " -ps"; + + // add no persist logs at start + if (AMBSettings.AMB_PersistLogs != null && AMBSettings.AMB_PersistLogs != "Y") + argString = argString + " -npl"; + + // add new log trigger size if it exists + if (AMBSettings.AMB_NewLogTriggerSize != null) + argString = argString + " -lts=" + AMBSettings.AMB_NewLogTriggerSize; + + // add active active + if (AMBSettings.AMB_ActiveActive != null && AMBSettings.AMB_ActiveActive != "N") + argString = argString + " -aa"; + + // add current version if it exists + if (AMBSettings.AMB_Version != null) + argString = argString + " -cv=" + AMBSettings.AMB_Version; + // add upgrade version if it exists + if (AMBSettings.AMB_UpgradeToVersion != null) + argString = argString + " -uv=" + AMBSettings.AMB_UpgradeToVersion; break; } diff --git a/AmbrosiaTest/AmbrosiaTest/app.config b/AmbrosiaTest/AmbrosiaTest/app.config index 96bcda5d..a3b83fb4 100644 --- a/AmbrosiaTest/AmbrosiaTest/app.config +++ b/AmbrosiaTest/AmbrosiaTest/app.config @@ -11,7 +11,7 @@ - + diff --git a/ImmortalCoordinator/Program.cs b/ImmortalCoordinator/Program.cs index 0c3f96c5..e7e8b190 100644 --- a/ImmortalCoordinator/Program.cs +++ b/ImmortalCoordinator/Program.cs @@ -20,6 +20,7 @@ class Program private static string _secureNetworkClassName; private static bool _isActiveActive = false; private static int _replicaNumber = 0; + private static long _shardID = -1; static void Main(string[] args) { @@ -132,6 +133,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { "ac|assemblyClass=", "The secure network assembly class.", ac => _secureNetworkClassName = ac }, { "ip|IPAddr=", "Override automatic self IP detection", i => _ipAddress = i }, { "h|help", "show this message and exit", h => showHelp = h != null }, + { "si|shardID=", "The shard ID", si => _shardID = long.Parse(si) }, }; try