diff --git a/CHANGES b/CHANGES index 42d164f55d..10983c78b7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + NEW FEATURES TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon) diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java index 8de4cb9fa3..48fe79a6ca 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -302,13 +302,16 @@ public HBaseTestClusterUtil getHBaseUtil() { //////////////////////////////////////////////////////// // Catalog Section //////////////////////////////////////////////////////// - public CatalogServer startCatalogCluster() throws Exception { + private void initCatalogCluster() throws Exception { if(isCatalogServerRunning) throw new IOException("Catalog Cluster already running"); CatalogTestingUtil.configureCatalog(conf, clusterTestBuildDir.getAbsolutePath()); LOG.info("Apache Derby repository is set to " + conf.get(CatalogConstants.CATALOG_URI)); conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0"); + } + public CatalogServer startCatalogCluster() throws Exception { + initCatalogCluster(); catalogServer = new CatalogServer(); catalogServer.init(conf); catalogServer.start(); @@ -460,6 +463,15 @@ private void startTajoWorkers(int numSlaves) throws Exception { } } + public void startMaster() throws Exception { + initCatalogCluster(); + tajoMaster = new TajoMaster(); + tajoMaster.init(conf); + tajoMaster.start(); + isCatalogServerRunning = true; + catalogServer = tajoMaster.getCatalogServer(); + } + public TajoMaster getMaster() { return this.tajoMaster; } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java index 97c3b7d396..27bcae666b 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java @@ -18,54 +18,102 @@ package org.apache.tajo.ha; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.TpchTestBase; +import org.apache.tajo.*; +import org.apache.tajo.catalog.*; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; +import org.apache.tajo.storage.*; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.assertTrue; import static junit.framework.TestCase.assertEquals; +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.assertNotEquals; public class TestHAServiceHDFSImpl { - private TajoTestingCluster cluster; - - private TajoMaster primaryMaster; - private TajoMaster backupMaster; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestHAServiceHDFSImpl"; + private TajoTestingCluster util; + private FileTablespace sm; + private CatalogService catalog; + private Path testDir; + private TableDesc employee; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(true); + + util.startMaster(); + catalog = util.getCatalogService(); + + sm = TablespaceManager.getLocalFs(); + + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + + Schema schema = SchemaBuilder.builder() + .add("managerid", TajoDataTypes.Type.INT4) + .add("empid", TajoDataTypes.Type.INT4) + .add("deptname", TajoDataTypes.Type.TEXT) + .build(); + + TableMeta employeeMeta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, util.getConfiguration()); + + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(employeeMeta, schema, employeePath); + appender.enableStats(); + appender.init(); + appender.flush(); + appender.close(); + + employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri()); + catalog.createTable(employee); + } - private Path haPath, activePath, backupPath; + @After + public void tearDown() throws Exception { + CommonTestingUtil.cleanupTestDir(TEST_PATH); + util.shutdownCatalogCluster(); + } - //@Test TODO: enable this test after TAJO-1866 fixed + @Test public final void testAutoFailOver() throws Exception { - cluster = TpchTestBase.getInstance().getTestingCluster(); + TajoMaster backupMaster = null; try { - FileSystem fs = cluster.getDefaultFileSystem(); + TajoMaster primaryMaster = util.getMaster(); + assertNotNull(primaryMaster); - TajoConf primaryConf = setConfigForHAMaster(); - primaryMaster = new TajoMaster(); - primaryMaster.init(primaryConf); - primaryMaster.start(); - - TajoConf backupConf = setConfigForHAMaster(); + TajoConf conf = getBackupMasterConfiguration(); backupMaster = new TajoMaster(); - backupMaster.init(backupConf); + backupMaster.init(conf); backupMaster.start(); + Assert.assertNotNull(backupMaster); - ServiceTracker tracker = ServiceTrackerFactory.get(primaryConf); - + ServiceTracker tracker = ServiceTrackerFactory.get(util.getConfiguration()); assertNotEquals(primaryMaster.getMasterName(), backupMaster.getMasterName()); - verifySystemDirectories(fs); + + FileSystem fs = sm.getFileSystem(); + Path haPath = TajoConf.getSystemHADir(util.getConfiguration()); + assertTrue(fs.exists(haPath)); + + Path activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + assertTrue(fs.exists(activePath)); + + Path backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + assertTrue(fs.exists(backupPath)); assertEquals(2, fs.listStatus(activePath).length); assertEquals(1, fs.listStatus(backupPath).length); @@ -75,68 +123,43 @@ public final void testAutoFailOver() throws Exception { assertTrue(fs.exists(new Path(backupPath, backupMaster.getMasterName().replaceAll(":", "_")))); createDatabaseAndTable(tracker); - verifyDataBaseAndTable(tracker); + existDataBaseAndTable(tracker); primaryMaster.stop(); - verifyDataBaseAndTable(tracker); + existDataBaseAndTable(tracker); assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE))); assertTrue(fs.exists(new Path(activePath, backupMaster.getMasterName().replaceAll(":", "_")))); assertEquals(2, fs.listStatus(activePath).length); assertEquals(0, fs.listStatus(backupPath).length); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, "employee")); } finally { if (backupMaster != null) { - backupMaster.stop(); + backupMaster.close(); } } + } - private TajoConf setConfigForHAMaster() { - TajoConf conf = new TajoConf(cluster.getConfiguration()); - - conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, - "localhost:" + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, - "localhost:" + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, - "localhost:" + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, - "localhost:" + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS, - "localhost:" + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.REST_SERVICE_ADDRESS, - "localhost:" + NetUtils.getFreeSocketPort()); + private TajoConf getBackupMasterConfiguration() { + TajoConf conf = new TajoConf(util.getConfiguration()); + + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.REST_SERVICE_ADDRESS, "localhost:" + NetUtils.getFreeSocketPort()); conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); conf.setIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL, 1000); - //Client API service RPC Server - conf.setIntVar(TajoConf.ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); - - // Internal RPC Server - conf.setIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2); - conf.setIntVar(TajoConf.ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2); - return conf; } - private void verifySystemDirectories(FileSystem fs) throws Exception { - haPath = TajoConf.getSystemHADir(cluster.getConfiguration()); - assertTrue(fs.exists(haPath)); - - activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - assertTrue(fs.exists(activePath)); - - backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - assertTrue(fs.exists(backupPath)); - } - private void createDatabaseAndTable(ServiceTracker tracker) throws Exception { TajoClient client = null; try { @@ -148,7 +171,7 @@ private void createDatabaseAndTable(ServiceTracker tracker) throws Exception { } } - private void verifyDataBaseAndTable(ServiceTracker tracker) throws Exception { + private void existDataBaseAndTable(ServiceTracker tracker) throws Exception { TajoClient client = null; try { client = new TajoClientImpl(tracker); @@ -159,4 +182,5 @@ private void verifyDataBaseAndTable(ServiceTracker tracker) throws Exception { IOUtils.cleanup(null, client); } } -} + +} \ No newline at end of file