diff --git a/Ambrosia/Ambrosia/Initialize.cs b/Ambrosia/Ambrosia/Initialize.cs new file mode 100644 index 00000000..697c26be --- /dev/null +++ b/Ambrosia/Ambrosia/Initialize.cs @@ -0,0 +1,137 @@ +using CRA.ClientLibrary; +using System.IO; +using System.Threading.Tasks; +using System.Xml.Serialization; + +namespace Ambrosia +{ + public class AmbrosiaRuntimeParams + { + public int serviceReceiveFromPort; + public int serviceSendToPort; + public string serviceName; + public string AmbrosiaBinariesLocation; + public string serviceLogPath; + public bool? createService; + public bool pauseAtStart; + public bool persistLogs; + public bool activeActive; + public long logTriggerSizeMB; + public string storageConnectionString; + public long currentVersion; + public long upgradeToVersion; + public long shardID; + public string oldShards; + public string newShards; + } + + public static class AmbrosiaRuntimeParms + { + public static bool _looseAttach = false; + } + + public class AmbrosiaNonShardedRuntime : VertexBase + { + private AmbrosiaRuntime Runtime { get; set; } + public AmbrosiaNonShardedRuntime() + { + Runtime = new AmbrosiaRuntime(); + } + + private long[] ParseLongs(string s) + { + if (s.Trim() == "") + { + return new long[] { }; + } + string[] shards = s.Split(','); + long[] ids = new long[shards.Length]; + + for (int i = 0; i < shards.Length; i++) + { + ids[i] = long.Parse(shards[i]); + } + return ids; + + } + + public override async Task InitializeAsync(object param) + { + // Workaround because of parameter type limitation in CRA + AmbrosiaRuntimeParams p = new AmbrosiaRuntimeParams(); + XmlSerializer xmlSerializer = new XmlSerializer(p.GetType()); + using (StringReader textReader = new StringReader((string)param)) + { + p = (AmbrosiaRuntimeParams)xmlSerializer.Deserialize(textReader); + } + + bool sharded = false; + + Runtime.Initialize( + p.serviceReceiveFromPort, + p.serviceSendToPort, + p.serviceName, + p.serviceLogPath, + p.createService, + p.pauseAtStart, + p.persistLogs, + p.activeActive, + p.logTriggerSizeMB, + p.storageConnectionString, + p.currentVersion, + p.upgradeToVersion, + ClientLibrary, + AddAsyncInputEndpoint, + AddAsyncOutputEndpoint, + ParseLongs(p.oldShards), + ParseLongs(p.newShards) + ); + + return; + } + } + + public class AmbrosiaShardedRuntime : ShardedVertexBase + { + private AmbrosiaRuntime Runtime { get; set; } + public AmbrosiaShardedRuntime() + { + Runtime = new AmbrosiaRuntime(); + } + + public override async Task InitializeAsync(int shardId, ShardingInfo shardingInfo, object param) + { + // Workaround because of parameter type limitation in CRA + AmbrosiaRuntimeParams p = new AmbrosiaRuntimeParams(); + XmlSerializer xmlSerializer = new XmlSerializer(p.GetType()); + using (StringReader textReader = new StringReader((string)param)) + { + p = (AmbrosiaRuntimeParams)xmlSerializer.Deserialize(textReader); + } + + bool sharded = true; + + Runtime.Initialize( + p.serviceReceiveFromPort, + p.serviceSendToPort, + p.serviceName, + p.serviceLogPath, + p.createService, + p.pauseAtStart, + p.persistLogs, + p.activeActive, + p.logTriggerSizeMB, + p.storageConnectionString, + p.currentVersion, + p.upgradeToVersion, + ClientLibrary, + AddAsyncInputEndpoint, + AddAsyncOutputEndpoint, + new long[0], + new long[0] + ); + + return; + } + } +} diff --git a/Ambrosia/Ambrosia/Program.cs b/Ambrosia/Ambrosia/Program.cs index d7bd97cb..b98db39d 100644 --- a/Ambrosia/Ambrosia/Program.cs +++ b/Ambrosia/Ambrosia/Program.cs @@ -1042,29 +1042,7 @@ public OutputConnectionRecord(AmbrosiaRuntime inAmbrosia) } } - public class AmbrosiaRuntimeParams - { - public int serviceReceiveFromPort; - public int serviceSendToPort; - public string serviceName; - public string AmbrosiaBinariesLocation; - public string serviceLogPath; - public bool? createService; - public bool pauseAtStart; - public bool persistLogs; - public bool activeActive; - public long logTriggerSizeMB; - public string storageConnectionString; - public long currentVersion; - public long upgradeToVersion; - } - - public static class AmbrosiaRuntimeParms - { - public static bool _looseAttach = false; - } - - public class AmbrosiaRuntime : VertexBase + public class AmbrosiaRuntime { #if _WINDOWS [DllImport("Kernel32.dll", CallingConvention = CallingConvention.Winapi)] @@ -2060,6 +2038,8 @@ public async Task FromStreamAsync(Stream stream, string otherProcess, string oth bool _sharded; internal bool _createService; long _shardID; + long[] _oldShards; + long[] _newShards; bool _runningRepro; long _currentVersion; long _upgradeToVersion; @@ -3870,38 +3850,6 @@ private void InitializeLogWriterStatics() #endif } - public override async Task InitializeAsync(object param) - { - InitializeLogWriterStatics(); - - // Workaround because of parameter type limitation in CRA - AmbrosiaRuntimeParams p = new AmbrosiaRuntimeParams(); - XmlSerializer xmlSerializer = new XmlSerializer(p.GetType()); - using (StringReader textReader = new StringReader((string)param)) - { - p = (AmbrosiaRuntimeParams)xmlSerializer.Deserialize(textReader); - } - - bool sharded = false; - - Initialize( - p.serviceReceiveFromPort, - p.serviceSendToPort, - p.serviceName, - p.serviceLogPath, - p.createService, - p.pauseAtStart, - p.persistLogs, - p.activeActive, - p.logTriggerSizeMB, - p.storageConnectionString, - p.currentVersion, - p.upgradeToVersion, - sharded - ); - return; - } - internal void RuntimeChecksOnProcessStart() { if (!_createService) @@ -3955,9 +3903,15 @@ public void Initialize(int serviceReceiveFromPort, string storageConnectionString, long currentVersion, long upgradeToVersion, - bool sharded + CRAClientLibrary coral, + Action addInput, + Action addOutput, + long[] oldShards, + long[] newShards ) { + InitializeLogWriterStatics(); + _runningRepro = false; _currentVersion = currentVersion; _upgradeToVersion = upgradeToVersion; @@ -3971,6 +3925,8 @@ bool sharded { Console.WriteLine("Ready ..."); } + _oldShards = oldShards; + _newShards = newShards; _persistLogs = persistLogs; _activeActive = activeActive; _newLogTriggerSize = logTriggerSizeMB * 1000000; @@ -3979,11 +3935,9 @@ bool sharded _localServiceSendToPort = serviceSendToPort; _serviceName = serviceName; _storageConnectionString = storageConnectionString; - _sharded = sharded; - _coral = ClientLibrary; + _coral = coral; Console.WriteLine("Logs directory: {0}", _serviceLogPath); - if (createService == null) { if (_logWriterStatics.DirectoryExists(RootDirectory())) @@ -3995,10 +3949,10 @@ bool sharded createService = true; } } - AddAsyncInputEndpoint(AmbrosiaDataInputsName, new AmbrosiaInput(this, "data")); - AddAsyncInputEndpoint(AmbrosiaControlInputsName, new AmbrosiaInput(this, "control")); - AddAsyncOutputEndpoint(AmbrosiaDataOutputsName, new AmbrosiaOutput(this, "data")); - AddAsyncOutputEndpoint(AmbrosiaControlOutputsName, new AmbrosiaOutput(this, "control")); + addInput(AmbrosiaDataInputsName, new AmbrosiaInput(this, "data")); + addInput(AmbrosiaControlInputsName, new AmbrosiaInput(this, "control")); + addOutput(AmbrosiaDataOutputsName, new AmbrosiaOutput(this, "data")); + addOutput(AmbrosiaControlOutputsName, new AmbrosiaOutput(this, "control")); _createService = createService.Value; RecoverOrStartAsync().Wait(); } @@ -4044,6 +3998,45 @@ class Program private static long _logTriggerSizeMB = 1000; private static int _currentVersion = 0; private static long _upgradeVersion = -1; + private static long _shardID = -1; + private static string _oldShards = ""; + private static string _newShards = ""; + + private static void DefineVertex(CRAClientLibrary client, string vertexDefinition, bool sharded) + { + Task result; + if (!sharded) + { + result = client.DefineVertexAsync(vertexDefinition, () => new AmbrosiaNonShardedRuntime()); + } else + { + result = client.DefineVertexAsync(vertexDefinition, () => new AmbrosiaShardedRuntime()); + } + + if (result.GetAwaiter().GetResult() != CRAErrorCode.Success) + { + throw new Exception(); + } + } + + private static void InstantiateVertex(CRAClientLibrary client, string instanceName, string vertexName, string vertexDefinition, object vertexParameter, long shardID) + { + CRAErrorCode result; + if (shardID == -1) + { + result = client.InstantiateVertexAsync(instanceName, vertexName, vertexDefinition, vertexParameter).GetAwaiter().GetResult(); + } + else + { + ConcurrentDictionary vertexShards = new ConcurrentDictionary(); + vertexShards[instanceName] = (int)shardID; + result = client.InstantiateShardedVertex(vertexName, vertexDefinition, vertexParameter, vertexShards); + } + if (result != CRAErrorCode.Success) + { + throw new Exception(); + } + } static void Main(string[] args) { @@ -4057,6 +4050,7 @@ static void Main(string[] args) _isTestingUpgrade, _serviceReceiveFromPort, _serviceSendToPort); return; case LocalAmbrosiaRuntimeModes.AddReplica: + case LocalAmbrosiaRuntimeModes.AddShard: case LocalAmbrosiaRuntimeModes.RegisterInstance: if (_runtimeMode == LocalAmbrosiaRuntimeModes.AddReplica) { @@ -4068,6 +4062,7 @@ static void Main(string[] args) client.DisableArtifactUploading(); var replicaName = $"{_instanceName}{_replicaNumber}"; + AmbrosiaRuntimeParams param = new AmbrosiaRuntimeParams(); param.createService = _recoveryMode == AmbrosiaRecoveryModes.A ? (bool?)null @@ -4084,13 +4079,13 @@ static void Main(string[] args) param.serviceLogPath = _serviceLogPath; param.AmbrosiaBinariesLocation = _binariesLocation; param.storageConnectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONN_STRING"); + param.shardID = _shardID; + param.oldShards = _oldShards; + param.newShards = _newShards; try { - if (client.DefineVertexAsync(param.AmbrosiaBinariesLocation, () => new AmbrosiaRuntime()).GetAwaiter().GetResult() != CRAErrorCode.Success) - { - throw new Exception(); - } + DefineVertex(client, param.AmbrosiaBinariesLocation, _shardID >= 0); // Workaround because of limitation in parameter serialization in CRA XmlSerializer xmlSerializer = new XmlSerializer(param.GetType()); @@ -4100,11 +4095,7 @@ static void Main(string[] args) xmlSerializer.Serialize(textWriter, param); serializedParams = textWriter.ToString(); } - - if (client.InstantiateVertexAsync(replicaName, param.serviceName, param.AmbrosiaBinariesLocation, serializedParams).GetAwaiter().GetResult() != CRAErrorCode.Success) - { - throw new Exception(); - } + InstantiateVertex(client, replicaName, param.serviceName, param.AmbrosiaBinariesLocation, serializedParams, _shardID); client.AddEndpointAsync(param.serviceName, AmbrosiaRuntime.AmbrosiaDataInputsName, true, true).Wait(); client.AddEndpointAsync(param.serviceName, AmbrosiaRuntime.AmbrosiaDataOutputsName, false, true).Wait(); client.AddEndpointAsync(param.serviceName, AmbrosiaRuntime.AmbrosiaControlInputsName, true, true).Wait(); @@ -4137,6 +4128,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { "rp|receivePort=", "The service receive from port [REQUIRED].", rp => _serviceReceiveFromPort = int.Parse(rp) }, { "sp|sendPort=", "The service send to port. [REQUIRED]", sp => _serviceSendToPort = int.Parse(sp) }, { "l|log=", "The service log path.", l => _serviceLogPath = l }, + {"si|shardID=", "The shard ID of the instance", si => _shardID = long.Parse(si) }, }; var helpOption = new OptionSet @@ -4170,16 +4162,23 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { "tu|testingUpgrade", "Is testing upgrade.", u => _isTestingUpgrade = true }, }); + var addShardOptionSet = new OptionSet + { + {"os|oldShards=", "Comma separated list of shards to recover from [REQUIRED].", os => _oldShards = os }, + {"ns|newShards=", "Comma separated list of new shards being created [REQUIRED].", ns => _newShards = ns } + }.AddMany(registerInstanceOptionSet); + registerInstanceOptionSet = registerInstanceOptionSet.AddMany(helpOption); addReplicaOptionSet = addReplicaOptionSet.AddMany(helpOption); debugInstanceOptionSet = debugInstanceOptionSet.AddMany(helpOption); - + addShardOptionSet = addShardOptionSet.AddMany(helpOption); var runtimeModeToOptionSet = new Dictionary { { LocalAmbrosiaRuntimeModes.RegisterInstance, registerInstanceOptionSet}, { LocalAmbrosiaRuntimeModes.AddReplica, addReplicaOptionSet}, { LocalAmbrosiaRuntimeModes.DebugInstance, debugInstanceOptionSet}, + { LocalAmbrosiaRuntimeModes.AddShard, addShardOptionSet }, }; _runtimeMode = default(LocalAmbrosiaRuntimeModes); @@ -4212,6 +4211,7 @@ public enum LocalAmbrosiaRuntimeModes AddReplica, RegisterInstance, DebugInstance, + AddShard, } public enum AmbrosiaRecoveryModes @@ -4246,6 +4246,14 @@ private static void ValidateOptions(OptionSet options, bool shouldShowHelp) } } + if (_runtimeMode == LocalAmbrosiaRuntimeModes.AddShard) + { + if (_shardID == -1) + { + errorMessage += "Shard ID is required.\n"; + } + } + // handles the case when an upgradeversion is not specified if (_upgradeVersion == -1) { diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB1.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB1.cmp new file mode 100644 index 00000000..3797d308 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB1.cmp @@ -0,0 +1 @@ +The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB2.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB2.cmp new file mode 100644 index 00000000..3797d308 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB2.cmp @@ -0,0 +1 @@ +The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob.cmp new file mode 100644 index 00000000..a0010664 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob.cmp @@ -0,0 +1,9 @@ +Bytes per RPC Throughput (GB/sec) +*X* 32768 0.0538327740000449 +Service Received 1024 MB so far +*X* 16384 0.0709862409498754 +Service Received 2048 MB so far +*X* 8192 0.0695878693925042 +Service Received 3072 MB so far +Bytes received: 3221225472 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob_Verify.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob_Verify.cmp new file mode 100644 index 00000000..57436b68 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob_Verify.cmp @@ -0,0 +1,9 @@ +Bytes per RPC Throughput (GB/sec) +*X* 32768 0.00943044129323776 +Service Received 1024 MB so far +*X* 16384 0.00989352845861985 +Service Received 2048 MB so far +*X* 8192 0.00993638850272688 +Service Received 3072 MB so far +Bytes received: 3221225472 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server.cmp new file mode 100644 index 00000000..29933c01 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server.cmp @@ -0,0 +1,18 @@ +*X* At checkpoint, received 0 messages +*X* becoming primary +*X* Server in Entry Point +*X* I'm healthy after 3000 checks at time:10/24/2018 1:15:13 PM +*X* I'm healthy after 6000 checks at time:10/24/2018 1:15:19 PM +*X* At checkpoint, received 30564 messages +Received 1024 MB so far +*X* I'm healthy after 9000 checks at time:10/24/2018 1:15:25 PM +*X* I'm healthy after 12000 checks at time:10/24/2018 1:15:31 PM +*X* At checkpoint, received 89584 messages +Received 2048 MB so far +*X* I'm healthy after 15000 checks at time:10/24/2018 1:15:37 PM +*X* I'm healthy after 18000 checks at time:10/24/2018 1:15:43 PM +*X* At checkpoint, received 202934 messages +*X* I'm healthy after 21000 checks at time:10/24/2018 1:15:49 PM +Received 3072 MB so far +Bytes received: 3221225472 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server_Verify.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server_Verify.cmp new file mode 100644 index 00000000..a3e33993 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server_Verify.cmp @@ -0,0 +1,18 @@ +*X* Server in Entry Point +*X* I'm healthy after 3000 checks at time:10/24/2018 1:20:10 PM +*X* I'm healthy after 6000 checks at time:10/24/2018 1:20:16 PM +Received 1024 MB so far +*X* I'm healthy after 9000 checks at time:10/24/2018 1:20:22 PM +*X* I'm healthy after 12000 checks at time:10/24/2018 1:20:28 PM +Received 2048 MB so far +*X* I'm healthy after 15000 checks at time:10/24/2018 1:20:34 PM +*X* I'm healthy after 18000 checks at time:10/24/2018 1:20:40 PM +*X* I'm healthy after 21000 checks at time:10/24/2018 1:20:46 PM +*X* I'm healthy after 24000 checks at time:10/24/2018 1:20:52 PM +*X* I'm healthy after 27000 checks at time:10/24/2018 1:20:58 PM +*X* I'm healthy after 30000 checks at time:10/24/2018 1:21:04 PM +Received 3072 MB so far +Bytes received: 3221225472 +DONE +*X* I'm healthy after 33000 checks at time:10/24/2018 1:21:10 PM +*X* I'm healthy after 36000 checks at time:10/24/2018 1:21:16 PM diff --git a/AmbrosiaTest/AmbrosiaTest/EndToEndStressIntegration_Test.cs b/AmbrosiaTest/AmbrosiaTest/EndToEndStressIntegration_Test.cs index a65a3851..b8d6324c 100644 --- a/AmbrosiaTest/AmbrosiaTest/EndToEndStressIntegration_Test.cs +++ b/AmbrosiaTest/AmbrosiaTest/EndToEndStressIntegration_Test.cs @@ -109,6 +109,93 @@ public void AMB_Basic_Test() MyUtils.VerifyAmbrosiaLogFile(testName, Convert.ToInt64(byteSize), true, true,AMB1.AMB_Version); } + [TestMethod] + public void AMB_Shard_Basic_Test() + { + // Test that one shard per server and client works + string testName = "shardbasictest"; + string clientJobName = testName + "clientjob"; + string serverName = testName + "server"; + string ambrosiaLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\"; + string byteSize = "3221225472"; + + Utilities MyUtils = new Utilities(); + + //AMB1 - Job + string logOutputFileName_AMB1 = testName + "_AMB1.log"; + AMB_Settings AMB1 = new AMB_Settings + { + AMB_ServiceName = clientJobName, + AMB_PortAppReceives = "1000", + AMB_PortAMBSends = "1001", + AMB_ServiceLogPath = ambrosiaLogDir, + AMB_CreateService = "A", + AMB_PauseAtStart = "N", + AMB_PersistLogs = "Y", + AMB_NewLogTriggerSize = "1000", + AMB_ActiveActive = "N", + AMB_Version = "0", + AMB_ShardID = "0", + }; + MyUtils.CallAMB(AMB1, logOutputFileName_AMB1, AMB_ModeConsts.RegisterInstance); + + //AMB2 + string logOutputFileName_AMB2 = testName + "_AMB2.log"; + AMB_Settings AMB2 = new AMB_Settings + { + AMB_ServiceName = serverName, + AMB_PortAppReceives = "2000", + AMB_PortAMBSends = "2001", + AMB_ServiceLogPath = ambrosiaLogDir, + AMB_CreateService = "A", + AMB_PauseAtStart = "N", + AMB_PersistLogs = "Y", + AMB_NewLogTriggerSize = "1000", + AMB_ActiveActive = "N", + AMB_Version = "0", + AMB_ShardID = "0", + }; + MyUtils.CallAMB(AMB2, logOutputFileName_AMB2, AMB_ModeConsts.RegisterInstance); + + //ImmCoord1 + string logOutputFileName_ImmCoord1 = testName + "_ImmCoord1.log"; + int ImmCoordProcessID1 = MyUtils.StartImmCoord(clientJobName, 1500, logOutputFileName_ImmCoord1); + + //ImmCoord2 + string logOutputFileName_ImmCoord2 = testName + "_ImmCoord2.log"; + int ImmCoordProcessID2 = MyUtils.StartImmCoord(serverName, 2500, logOutputFileName_ImmCoord2); + + //Client Job Call + string logOutputFileName_ClientJob = testName + "_ClientJob.log"; + int clientJobProcessID = MyUtils.StartPerfClientJob("1001", "1000", clientJobName, serverName, "32768", "3", logOutputFileName_ClientJob); + + //Server Call + string logOutputFileName_Server = testName + "_Server.log"; + int serverProcessID = MyUtils.StartPerfServer("2001", "2000", clientJobName, serverName, logOutputFileName_Server, 1, false); + + //Delay until client is done - also check Server just to make sure + bool pass = MyUtils.WaitForProcessToFinish(logOutputFileName_ClientJob, byteSize, 15, false, testName, true); // number of bytes processed + pass = MyUtils.WaitForProcessToFinish(logOutputFileName_Server, byteSize, 15, false, testName, true); + + // Stop things so file is freed up and can be opened in verify + MyUtils.KillProcess(clientJobProcessID); + MyUtils.KillProcess(serverProcessID); + MyUtils.KillProcess(ImmCoordProcessID1); + MyUtils.KillProcess(ImmCoordProcessID2); + + //Verify AMB + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB1); + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB2); + + // Verify Client + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_ClientJob); + + // Verify Server + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_Server); + + // Verify integrity of Ambrosia logs by replaying + MyUtils.VerifyAmbrosiaLogFile(testName, Convert.ToInt64(byteSize), true, true, AMB1.AMB_Version, shardID: 1); + } //** This test does 5 rounds of messages starting with 64MB and cutting in half each time //** Basically same as the basic test but passing giant message - the difference is in the job.exe call and that is it diff --git a/AmbrosiaTest/AmbrosiaTest/Utilities.cs b/AmbrosiaTest/AmbrosiaTest/Utilities.cs index 9bfe92a2..e06717da 100644 --- a/AmbrosiaTest/AmbrosiaTest/Utilities.cs +++ b/AmbrosiaTest/AmbrosiaTest/Utilities.cs @@ -28,11 +28,13 @@ public class AMB_Settings public string AMB_Version { get; set; } public string AMB_UpgradeToVersion { get; set; } public string AMB_ReplicaNumber { get; set; } - + public string AMB_ShardID { get; set; } + public string AMB_OldShards { get; set; } + public string AMB_NewShards { get; set; } } // These are the different modes of what the AMB is called - public enum AMB_ModeConsts { RegisterInstance, AddReplica, DebugInstance }; + public enum AMB_ModeConsts { RegisterInstance, AddReplica, DebugInstance, AddShard }; public class Utilities { @@ -460,7 +462,7 @@ public void VerifyTestOutputFileToCmpFile(string testOutputLogFile) // // Assumption: Test Output logs are .log and the cmp is the same file name but with .cmp extension //********************************************************************* - public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpFile, bool startWithFirstFile, string CurrentVersion, string optionalNumberOfClient = "", bool asyncTest = false) + public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpFile, bool startWithFirstFile, string CurrentVersion, string optionalNumberOfClient = "", bool asyncTest = false, long shardID = -1) { // Basically doing this for multi client stuff @@ -481,6 +483,11 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF // used to get log file string ambrosiaClientLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\" + testName + "clientjob" + optionalMultiClientStartingPoint + "_" + CurrentVersion; string ambrosiaServerLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\" + testName + "server_" + CurrentVersion; + if (shardID != -1) + { + ambrosiaClientLogDir += "\\" + shardID.ToString(); + ambrosiaServerLogDir += "\\" + shardID.ToString(); + } string startingClientChkPtVersionNumber = "1"; string clientFirstFile = ""; @@ -578,6 +585,10 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF AMB_PortAppReceives = "1000", AMB_PortAMBSends = "1001" }; + if (shardID != -1) + { + AMB1.AMB_ShardID = shardID.ToString(); + } CallAMB(AMB1, logOutputFileName_AMB1, AMB_ModeConsts.DebugInstance); // AMB for Server @@ -592,6 +603,10 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF AMB_PortAppReceives = "2000", AMB_PortAMBSends = "2001" }; + if (shardID != -1) + { + AMB2.AMB_ShardID = shardID.ToString(); + } CallAMB(AMB2, logOutputFileName_AMB2, AMB_ModeConsts.DebugInstance); string logOutputFileName_ClientJob_Verify; @@ -634,7 +649,7 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF } - public int StartImmCoord(string ImmCoordName, int portImmCoordListensAMB, string testOutputLogFile, bool ActiveActive=false, int replicaNum = 9999) + public int StartImmCoord(string ImmCoordName, int portImmCoordListensAMB, string testOutputLogFile, bool ActiveActive=false, int replicaNum = 9999, long shardID = -1) { // Launch the AMB process with these values @@ -646,7 +661,10 @@ public int StartImmCoord(string ImmCoordName, int portImmCoordListensAMB, string } string argString = "-i=" + ImmCoordName + " -p=" + portImmCoordListensAMB.ToString(); - + if (shardID != -1) + { + argString = argString + " -si=" + shardID.ToString(); + } // if Active Active then required to get replicanu if (ActiveActive) @@ -721,6 +739,9 @@ public void CallAMB(AMB_Settings AMBSettings, string testOutputLogFile, AMB_Mode if (AMBSettings.AMB_UpgradeToVersion != null) argString = argString + " -uv=" + AMBSettings.AMB_UpgradeToVersion; + if (AMBSettings.AMB_ShardID != null) + argString = argString + " -si=" + AMBSettings.AMB_ShardID; + // add current version if it exists if (AMBSettings.AMB_Version != null) argString = argString + " -cv=" + AMBSettings.AMB_Version; @@ -788,7 +809,43 @@ public void CallAMB(AMB_Settings AMBSettings, string testOutputLogFile, AMB_Mode // testing upgrade if (AMBSettings.AMB_TestingUpgrade != null && AMBSettings.AMB_TestingUpgrade != "N") argString = argString + " -tu"; + if (AMBSettings.AMB_ShardID != null) + argString = argString + " -si=" + AMBSettings.AMB_ShardID; + break; + + case AMB_ModeConsts.AddShard: + argString = "AddShard " + "-si=" + AMBSettings.AMB_ShardID + " -i=" + AMBSettings.AMB_ServiceName + + " -rp=" + AMBSettings.AMB_PortAppReceives + " -sp=" + AMBSettings.AMB_PortAMBSends + + " -r=" + AMBSettings.AMB_ReplicaNumber + " -os=" + AMBSettings.AMB_OldShards + + " -ns=" + AMBSettings.AMB_NewShards; + + // add Service log path + if (AMBSettings.AMB_ServiceLogPath != null) + argString = argString + " -l=" + AMBSettings.AMB_ServiceLogPath; + + // add pause at start + if (AMBSettings.AMB_PauseAtStart != null && AMBSettings.AMB_PauseAtStart != "N") + argString = argString + " -ps"; + + // add no persist logs at start + if (AMBSettings.AMB_PersistLogs != null && AMBSettings.AMB_PersistLogs != "Y") + argString = argString + " -npl"; + + // add new log trigger size if it exists + if (AMBSettings.AMB_NewLogTriggerSize != null) + argString = argString + " -lts=" + AMBSettings.AMB_NewLogTriggerSize; + + // add active active + if (AMBSettings.AMB_ActiveActive != null && AMBSettings.AMB_ActiveActive != "N") + argString = argString + " -aa"; + + // add current version if it exists + if (AMBSettings.AMB_Version != null) + argString = argString + " -cv=" + AMBSettings.AMB_Version; + // add upgrade version if it exists + if (AMBSettings.AMB_UpgradeToVersion != null) + argString = argString + " -uv=" + AMBSettings.AMB_UpgradeToVersion; break; } diff --git a/AmbrosiaTest/AmbrosiaTest/app.config b/AmbrosiaTest/AmbrosiaTest/app.config index 96bcda5d..a3b83fb4 100644 --- a/AmbrosiaTest/AmbrosiaTest/app.config +++ b/AmbrosiaTest/AmbrosiaTest/app.config @@ -11,7 +11,7 @@ - + diff --git a/ImmortalCoordinator/Program.cs b/ImmortalCoordinator/Program.cs index 0e449bfc..e7e8b190 100644 --- a/ImmortalCoordinator/Program.cs +++ b/ImmortalCoordinator/Program.cs @@ -20,6 +20,7 @@ class Program private static string _secureNetworkClassName; private static bool _isActiveActive = false; private static int _replicaNumber = 0; + private static long _shardID = -1; static void Main(string[] args) { @@ -96,7 +97,7 @@ static void Main(string[] args) dataProvider, descriptor, connectionsPoolPerWorker); worker.DisableDynamicLoading(); - worker.SideloadVertex(new AmbrosiaRuntime(), "ambrosia"); + worker.SideloadVertex(new AmbrosiaNonShardedRuntime(), "ambrosia"); worker.Start(); } @@ -132,6 +133,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { "ac|assemblyClass=", "The secure network assembly class.", ac => _secureNetworkClassName = ac }, { "ip|IPAddr=", "Override automatic self IP detection", i => _ipAddress = i }, { "h|help", "show this message and exit", h => showHelp = h != null }, + { "si|shardID=", "The shard ID", si => _shardID = long.Parse(si) }, }; try