diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index ed17be236dc..0943e459ee8 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -46,6 +46,7 @@ message ControlRequest { PortCompletedRequest portCompletedRequest = 9; WorkerStateUpdatedRequest workerStateUpdatedRequest = 10; LinkWorkersRequest linkWorkersRequest = 11; + IterationCompletedRequest iterationCompletedRequest = 12; // request for worker AddInputChannelRequest addInputChannelRequest = 50; @@ -57,6 +58,7 @@ message ControlRequest { EmptyRequest emptyRequest = 56; PrepareCheckpointRequest prepareCheckpointRequest = 57; QueryStatisticsRequest queryStatisticsRequest = 58; + EndIterationRequest endIterationRequest = 59; // request for testing Ping ping = 100; @@ -168,6 +170,11 @@ message PortCompletedRequest { bool input = 2; } +// Notify controller that an output port has finished one iteration (used by loop operators). +message IterationCompletedRequest { + core.PortIdentity portId = 1 [(scalapb.field).no_box = true]; +} + message WorkerStateUpdatedRequest { worker.WorkerState state = 1 [(scalapb.field).no_box = true]; } @@ -271,4 +278,8 @@ message PrepareCheckpointRequest{ message QueryStatisticsRequest{ repeated core.ActorVirtualIdentity filterByWorkers = 1; +} + +message EndIterationRequest{ + core.ActorVirtualIdentity worker = 1 [(scalapb.field).no_box = true]; } \ No newline at end of file diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto index 70d189a3411..35d88dcf0f1 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto @@ -37,6 +37,7 @@ service ControllerService { rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatePythonExpressionResponse); rpc ConsoleMessageTriggered(ConsoleMessageTriggeredRequest) returns (EmptyReturn); rpc PortCompleted(PortCompletedRequest) returns (EmptyReturn); + rpc IterationCompleted(IterationCompletedRequest) returns (EmptyReturn); rpc StartWorkflow(EmptyRequest) returns (StartWorkflowResponse); rpc ResumeWorkflow(EmptyRequest) returns (EmptyReturn); rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto index 43613b5cfdc..98405c91b78 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto @@ -122,6 +122,7 @@ enum WorkflowAggregatedState { PAUSED = 4; RESUMING = 5; COMPLETED = 6; + ITERATION_COMPLETED = 11; FAILED = 7; UNKNOWN = 8; KILLED = 9; diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto index dbcd6d8a5e0..82f42ada71c 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto @@ -47,6 +47,8 @@ service WorkerService { rpc EndWorker(EmptyRequest) returns (EmptyReturn); rpc StartChannel(EmptyRequest) returns (EmptyReturn); rpc EndChannel(EmptyRequest) returns (EmptyReturn); + rpc EndIteration(EndIterationRequest) returns (EmptyReturn); + rpc NextIteration(EmptyRequest) returns (EmptyReturn); rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn); rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue); rpc NoOperation(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala index 4d9a36bab43..d0add4789b5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala @@ -40,6 +40,7 @@ class ControllerAsyncRPCHandlerInitializer( with ResumeHandler with StartWorkflowHandler with PortCompletedHandler + with IterationCompletedHandler with ConsoleMessageHandler with RetryWorkflowHandler with EvaluatePythonExpressionHandler diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index 9dcf3ad4bfc..aa51f3f0ccd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -52,6 +52,10 @@ class WorkflowScheduler( this.physicalPlan = updatedPhysicalPlan } - def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() + def getNextRegions: Set[Region] = { + val region : Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext() + println("current Region: " + region) + region + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala index d5939c2e3b1..e905c2b0449 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala @@ -59,8 +59,6 @@ case class RegionExecution(region: Region) { physicalOpId: PhysicalOpIdentity, inheritOperatorExecution: Option[OperatorExecution] = None ): OperatorExecution = { - assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution already exists.") - operatorExecutions.getOrElseUpdate( physicalOpId, inheritOperatorExecution diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index dea9b692a4f..b8b6d68091c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -44,11 +44,6 @@ case class WorkflowExecution() { * @throws AssertionError if the `RegionExecution` has already been initialized. */ def initRegionExecution(region: Region): RegionExecution = { - // ensure the region execution hasn't been initialized already. - assert( - !regionExecutions.contains(region.id), - s"RegionExecution of ${region.id} already initialized." - ) regionExecutions.getOrElseUpdate(region.id, RegionExecution(region)) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala new file mode 100644 index 00000000000..2aab1e4be00 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.controller.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.core.WorkflowRuntimeException +import org.apache.texera.amber.core.workflow.GlobalPortIdentity +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerAsyncRPCHandlerInitializer, + FatalError +} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + IterationCompletedRequest, + QueryStatisticsRequest +} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER +import org.apache.texera.amber.util.VirtualIdentityUtils + +/** Notify controller that a worker has completed an iteration on an output port. + * + * This is different from [[PortCompletedHandler]]: a port can have multiple iterations + * (e.g., loop execution) before the whole port is fully completed. + */ +trait IterationCompletedHandler { + this: ControllerAsyncRPCHandlerInitializer => + + override def iterationCompleted( + msg: IterationCompletedRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + controllerInterface + .controllerInitiateQueryStatistics(QueryStatisticsRequest(scala.Seq(ctx.sender)), CONTROLLER) + .map { _ => + val globalPortId = GlobalPortIdentity( + VirtualIdentityUtils.getPhysicalOpId(ctx.sender), + msg.portId + ) + + cp.workflowExecutionCoordinator.getRegionOfPortId(globalPortId) match { + case Some(region) => + // Emit UI-only IterationCompleted phase for this region. + cp.workflowExecutionCoordinator.markRegionIterationCompletedIfNeeded(region) + + // Keep scheduler running + cp.workflowExecutionCoordinator + .coordinateRegionExecutors(cp.actorService) + .onFailure { + case err: WorkflowRuntimeException => + sendToClient(FatalError(err, err.relatedWorkerId)) + case other => + sendToClient(FatalError(other, None)) + } + case None => + } + + EmptyReturn() + } + } +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 4ab3d18056f..bb8843ff1cc 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -20,22 +20,16 @@ package org.apache.texera.amber.engine.architecture.messaginglayer import org.apache.texera.amber.core.state.State -import org.apache.texera.amber.core.storage.DocumentFactory +import org.apache.texera.amber.core.storage.{DocumentFactory, VFSResourceType} import org.apache.texera.amber.core.storage.model.BufferedItemWriter +import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.tuple._ import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity} -import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{ - DPOutputIterator, - getBatchSize, - toPartitioner -} +import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{DPOutputIterator, getBatchSize, toPartitioner} import org.apache.texera.amber.engine.architecture.sendsemantics.partitioners._ import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._ -import org.apache.texera.amber.engine.architecture.worker.managers.{ - OutputPortResultWriterThread, - PortStorageWriterTerminateSignal -} +import org.apache.texera.amber.engine.architecture.worker.managers.{OutputPortResultWriterThread, PortStorageWriterTerminateSignal} import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.util.VirtualIdentityUtils @@ -124,6 +118,10 @@ class OutputManager( : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = mutable.HashMap() + val ECMWriterThreads + : mutable.HashMap[PortIdentity, BufferedItemWriter[Tuple]] = + mutable.HashMap() + /** * Add down stream operator and its corresponding Partitioner. * @@ -245,7 +243,6 @@ class OutputManager( writerThread.join() case None => } - } def getPort(portId: PortIdentity): WorkerPort = ports(portId) @@ -260,6 +257,13 @@ class OutputManager( outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) } + def finalizeIteration(worker: ActorVirtualIdentity): Unit = { + this.ports.keys + .foreach(outputPortId => + outputIterator.appendSpecialTupleToEnd(FinalizeIteration(outputPortId, worker)) + ) + } + /** * This method is only used for ensuring correct region execution. Some operators may have input port dependency * relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionCoordinator` @@ -280,12 +284,18 @@ class OutputManager( } private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { - val bufferedItemWriter = DocumentFactory + this.ECMWriterThreads(portId) = DocumentFactory + .createDocument(storageUri.resolve("ecm"), ResultSchema.ecmSchema) + .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .asInstanceOf[BufferedItemWriter[Tuple]] + + val bufferedTupleWriter = DocumentFactory .openDocument(storageUri) ._1 .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) .asInstanceOf[BufferedItemWriter[Tuple]] - val writerThread = new OutputPortResultWriterThread(bufferedItemWriter) + + val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter) this.outputPortResultWriterThreads(portId) = writerThread writerThread.start() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 7e5b228801f..84b18a48b57 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -103,6 +103,7 @@ class RegionExecutionCoordinator( private case object Unexecuted extends RegionExecutionPhase private case object ExecutingDependeePortsPhase extends RegionExecutionPhase private case object ExecutingNonDependeePortsPhase extends RegionExecutionPhase + private case object IterationCompleted extends RegionExecutionPhase private case object Completed extends RegionExecutionPhase private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new AtomicReference( @@ -190,7 +191,8 @@ class RegionExecutionCoordinator( } } - def isCompleted: Boolean = currentPhaseRef.get == Completed + // Treat IterationCompleted as not completed from scheduler perspective. + def isCompleted: Boolean = currentPhaseRef.get == Completed || currentPhaseRef.get == IterationCompleted /** * This will sync and transition the region execution phase from one to another depending on its current phase: @@ -219,6 +221,9 @@ class RegionExecutionCoordinator( } case ExecutingNonDependeePortsPhase => tryCompleteRegionExecution() + case IterationCompleted => + // IterationCompleted is a UI/observability phase; scheduling doesn't advance on it. + Future.Unit case Completed => // Already completed, no further action needed. Future.Unit @@ -543,6 +548,13 @@ class RegionExecutionCoordinator( } } + /** Emit IterationCompleted region phase to frontend. This does not affect scheduling semantics. */ + def setIterationCompletedPhase(): Unit = { + if (currentPhaseRef.get != Completed) { + setPhase(IterationCompleted) + } + } + private def setPhase(phase: RegionExecutionPhase): Unit = { currentPhaseRef.set(phase) SessionState.getAllSessionStates.foreach { state => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 6f34c9ed1e5..47474b8478c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -21,14 +21,22 @@ package org.apache.texera.amber.engine.architecture.scheduling case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) - + private var loopStartLevel = currentLevel def getRegions: List[Region] = levelSets.values.flatten.toList override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel) override def next(): Set[Region] = { val regions = levelSets(currentLevel) + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-")))) loopStartLevel = currentLevel currentLevel += 1 regions } + + def loopNext(): Set[Region] = { + val regions = levelSets(currentLevel) + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))) currentLevel = loopStartLevel + else currentLevel += 1 + regions + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 05585f88d8d..8939c2f4f38 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -27,7 +27,8 @@ import org.apache.texera.amber.engine.architecture.common.{ AkkaActorService } import org.apache.texera.amber.engine.architecture.controller.ControllerConfig -import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution +import org.apache.texera.amber.engine.architecture.controller.execution.{OperatorExecution, RegionExecution, WorkflowExecution} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient import scala.collection.mutable @@ -102,6 +103,27 @@ class WorkflowExecutionCoordinator( .unit } + /** + * Mark the given operator as iteration-completed. + * + * This is a UI/observability state only; it doesn't change the worker-level state machine. + */ + def markOperatorIterationCompleted(operatorExecution: OperatorExecution): Unit = { + // No-op placeholder: operator aggregated state is computed from WorkerState. + // IterationCompleted is currently an observability/UI state driven by region phase events. + () + } + + /** + * If all operators in the region have reached ITERATION_COMPLETED (or COMPLETED), mark the region as + * ITERATION_COMPLETED (region is still considered not completed, so scheduling remains unchanged). + */ + def markRegionIterationCompletedIfNeeded(region: Region): Unit = { + // Best-effort: tell region coordinator to emit an IterationCompleted phase event. + // This doesn't affect scheduling/completion semantics. + regionExecutionCoordinators.get(region.id).foreach(_.setIterationCompletedPhase()) + } + def getRegionOfLink(link: PhysicalLink): Region = { getExecutingRegions.find(region => region.getLinks.contains(link)).get } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 3aa5fa90a46..13f0b17de4f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -19,47 +19,33 @@ package org.apache.texera.amber.engine.architecture.worker +import com.google.protobuf.timestamp.Timestamp import com.softwaremill.macwire.wire import io.grpc.MethodDescriptor import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.state.State +import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.tuple._ -import org.apache.texera.amber.core.virtualidentity.{ - ActorVirtualIdentity, - ChannelIdentity, - EmbeddedControlMessageIdentity -} +import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} import org.apache.texera.amber.core.workflow.PortIdentity import org.apache.texera.amber.engine.architecture.common.AmberProcessor import org.apache.texera.amber.engine.architecture.logreplay.ReplayLogManager -import org.apache.texera.amber.engine.architecture.messaginglayer.{ - InputManager, - OutputManager, - WorkerTimerService -} -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{ - NO_ALIGNMENT, - PORT_ALIGNMENT -} +import org.apache.texera.amber.engine.architecture.messaginglayer.{InputManager, OutputManager, WorkerTimerService} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.ConsoleMessageType.{ERROR, PRINT} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{NO_ALIGNMENT, PORT_ALIGNMENT} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_END_CHANNEL -import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{ - DPInputQueueElement, - MainThreadDelegateMessage -} +import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{METHOD_END_CHANNEL, METHOD_END_ITERATION} +import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{DPInputQueueElement, MainThreadDelegateMessage} import org.apache.texera.amber.engine.architecture.worker.managers.SerializationManager -import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState.{ - COMPLETED, - READY, - RUNNING -} +import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState.{COMPLETED, READY, RUNNING} import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerStatistics import org.apache.texera.amber.engine.common.ambermessage._ import org.apache.texera.amber.engine.common.statetransition.WorkerStateManager import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.texera.amber.error.ErrorUtils.{mkConsoleMessage, safely} +import java.time.Instant import java.util.concurrent.LinkedBlockingQueue class DataProcessor( @@ -157,7 +143,7 @@ class DataProcessor( if (outputTuple == null) return outputTuple match { case FinalizeExecutor() => - sendECMToDataChannels(METHOD_END_CHANNEL, PORT_ALIGNMENT) + sendECMToDataChannels(METHOD_END_CHANNEL.getBareMethodName, PORT_ALIGNMENT) // Send Completed signal to worker actor. executor.close() adaptiveBatchingMonitor.stopAdaptiveBatching() @@ -171,7 +157,7 @@ class DataProcessor( EmptyRequest(), asyncRPCClient.mkContext(CONTROLLER) ) - case FinalizePort(portId, input) => + case FinalizePort(portId: PortIdentity, input: Boolean) => if (!input) { outputManager.closeOutputStorageWriterIfNeeded(portId) } @@ -179,11 +165,23 @@ class DataProcessor( PortCompletedRequest(portId, input), asyncRPCClient.mkContext(CONTROLLER) ) + case FinalizeIteration(portId: PortIdentity, worker: ActorVirtualIdentity) => + println(s"FinalizeIteration received at worker $actorId for port $portId") + sendECMToDataChannels( + METHOD_END_ITERATION.getBareMethodName, + PORT_ALIGNMENT, + EndIterationRequest(worker) + ) + val writer = outputManager.ECMWriterThreads(portId) + writer.putOne(new Tuple(ResultSchema.ecmSchema, Array(worker.name))) + writer.close() + outputManager.closeOutputStorageWriterIfNeeded(portId) + asyncRPCClient.controllerInterface.iterationCompleted(IterationCompletedRequest(portId), asyncRPCClient.mkContext(CONTROLLER)) + executor.reset() case schemaEnforceable: SchemaEnforceable => val portIdentity = outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity) val tuple = schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema) statisticsManager.increaseOutputStatistics(portIdentity, tuple.inMemSize) - outputManager.passTupleToDownstream(tuple, outputPortOpt) outputManager.saveTupleToStorageIfNeeded(tuple, outputPortOpt) case other => // skip for now @@ -234,6 +232,12 @@ class DataProcessor( inputManager.currentChannelId = channelId val command = ecm.commandMapping.get(actorId.name) logger.info(s"receive ECM from $channelId, id = ${ecm.id}, cmd = $command") + + + asyncRPCClient.controllerInterface.consoleMessageTriggered( + ConsoleMessageTriggeredRequest(ConsoleMessage(actorId.name, Timestamp(Instant.now), PRINT, "", s"received ECM from MATERIALIZATION_READER, id = ${ecm.id}", s"cmd = $command")), + asyncRPCClient.mkContext(CONTROLLER) + ) if (ecm.ecmType != NO_ALIGNMENT) { pauseManager.pauseInputChannel(ECMPause(ecm.id), List(channelId)) } @@ -264,23 +268,41 @@ class DataProcessor( } } + def processOnFinish(): Unit = { + val portId = inputGateway.getChannel(inputManager.currentChannelId).getPortId + try { + val outputState = executor.produceStateOnFinish(portId.id) + if (outputState.isDefined) { + outputManager.emitState(outputState.get) + } + outputManager.outputIterator.setTupleOutput( + executor.onFinishMultiPort(portId.id) + ) + } catch safely { + case e => + // forward input tuple to the user and pause DP thread + handleExecutorException(e) + } + } + def sendECMToDataChannels( - method: MethodDescriptor[EmptyRequest, EmptyReturn], - alignment: EmbeddedControlMessageType + method: String, + alignment: EmbeddedControlMessageType, + request: ControlRequest = EmptyRequest() ): Unit = { outputManager.flush() outputGateway.getActiveChannels .filter(!_.isControl) .foreach { activeChannelId => asyncRPCClient.sendECMToChannel( - EmbeddedControlMessageIdentity(method.getBareMethodName), + EmbeddedControlMessageIdentity(method), alignment, Set(), Map( activeChannelId.toWorkerId.name -> ControlInvocation( - method.getBareMethodName, - EmptyRequest(), + method, + request, AsyncRPCContext(ActorVirtualIdentity(""), ActorVirtualIdentity("")), -1 ) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala index 2abcdf66975..a69b783876e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala @@ -47,6 +47,8 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) with EndHandler with StartChannelHandler with EndChannelHandler + with EndIterationHandler + with NextIterationHandler with AssignPortHandler with AddInputChannelHandler with FlushNetworkBufferHandler diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala index 10fbbc44a2c..255339004a8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala @@ -24,27 +24,14 @@ import org.apache.texera.amber.config.ApplicationConfig import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.Tuple -import org.apache.texera.amber.core.virtualidentity.{ - ActorVirtualIdentity, - ChannelIdentity, - EmbeddedControlMessageIdentity -} +import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.toPartitioner -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{ - NO_ALIGNMENT, - PORT_ALIGNMENT -} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{NO_ALIGNMENT, PORT_ALIGNMENT} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{ - METHOD_END_CHANNEL, - METHOD_START_CHANNEL -} +import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{METHOD_END_CHANNEL, METHOD_END_ITERATION, METHOD_START_CHANNEL} import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning -import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{ - DPInputQueueElement, - FIFOMessageElement -} +import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{DPInputQueueElement, FIFOMessageElement} import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} import org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage @@ -82,8 +69,15 @@ class InputPortMaterializationReaderThread( */ override def run(): Unit = { // Notify the input port of start of input channel - emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT) + emitECM(METHOD_START_CHANNEL.getBareMethodName, NO_ALIGNMENT) try { + val ecm: VirtualDocument[Tuple] = DocumentFactory + .openDocument(uri.resolve("ecm")) + ._1 + .asInstanceOf[VirtualDocument[Tuple]] + val ecmReadIterator = ecm.get() + + val materialization: VirtualDocument[Tuple] = DocumentFactory .openDocument(uri) ._1 @@ -106,7 +100,15 @@ class InputPortMaterializationReaderThread( } // Flush any remaining tuples in the buffer. if (buffer.nonEmpty) flush() - emitECM(METHOD_END_CHANNEL, PORT_ALIGNMENT) + + + if (ecmReadIterator.hasNext) { + val tuple = ecmReadIterator.next() + println("Received ECM tuple: " + tuple.getField("workerId")) + emitECM(METHOD_END_ITERATION.getBareMethodName, NO_ALIGNMENT, EndIterationRequest(ActorVirtualIdentity(tuple.getField("workerId")))) + } else { + emitECM(METHOD_END_CHANNEL.getBareMethodName, PORT_ALIGNMENT) + } isFinished.set(true) } catch { case e: Exception => @@ -118,19 +120,21 @@ class InputPortMaterializationReaderThread( * Puts an ECM into the internal queue. */ private def emitECM( - method: MethodDescriptor[EmptyRequest, EmptyReturn], - alignment: EmbeddedControlMessageType + method: String, + alignment: EmbeddedControlMessageType, + request: ControlRequest = EmptyRequest() + ): Unit = { flush() val ecm = EmbeddedControlMessage( - EmbeddedControlMessageIdentity(method.getBareMethodName), + EmbeddedControlMessageIdentity(method), alignment, Seq(), Map( workerActorId.name -> ControlInvocation( - method.getBareMethodName, - EmptyRequest(), + method, + request, AsyncRPCContext(ActorVirtualIdentity(""), ActorVirtualIdentity("")), -1 ) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala index 28e5d2af667..188a08d28e5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala @@ -22,6 +22,7 @@ package org.apache.texera.amber.engine.architecture.worker.managers import com.google.common.collect.Queues import org.apache.texera.amber.core.storage.model.BufferedItemWriter import org.apache.texera.amber.core.tuple.Tuple +import org.apache.texera.amber.core.storage.result.ResultSchema import java.util.concurrent.LinkedBlockingQueue @@ -29,7 +30,7 @@ sealed trait TerminateSignal case object PortStorageWriterTerminateSignal extends TerminateSignal class OutputPortResultWriterThread( - bufferedItemWriter: BufferedItemWriter[Tuple] + bufferedTupleWriter: BufferedItemWriter[Tuple] ) extends Thread { val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] = @@ -40,10 +41,10 @@ class OutputPortResultWriterThread( while (!internalStop) { val queueContent = queue.take() queueContent match { - case Left(tuple) => bufferedItemWriter.putOne(tuple) + case Left(tuple) => bufferedTupleWriter.putOne(tuple) case Right(_) => internalStop = true } } - bufferedItemWriter.close() + bufferedTupleWriter.close() } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala index 7794342690b..43db78f6df1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala @@ -20,6 +20,7 @@ package org.apache.texera.amber.engine.architecture.worker.promisehandlers import com.twitter.util.Future +import org.apache.texera.amber.operator.loop.LoopStartOpExec import org.apache.texera.amber.core.tuple.FinalizePort import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, @@ -40,19 +41,7 @@ trait EndChannelHandler { val portId = dp.inputGateway.getChannel(channelId).getPortId dp.inputManager.getPort(portId).completed = true dp.inputManager.initBatch(channelId, Array.empty) - try { - val outputState = dp.executor.produceStateOnFinish(portId.id) - if (outputState.isDefined) { - dp.outputManager.emitState(outputState.get) - } - dp.outputManager.outputIterator.setTupleOutput( - dp.executor.onFinishMultiPort(portId.id) - ) - } catch safely { - case e => - // forward input tuple to the user and pause DP thread - dp.handleExecutorException(e) - } + dp.processOnFinish() dp.outputManager.outputIterator.appendSpecialTupleToEnd( FinalizePort(portId, input = true) @@ -63,7 +52,12 @@ trait EndChannelHandler { // See documentation of isMissingOutputPort if (!dp.outputManager.isMissingOutputPort) { // assuming all the output ports finalize after all input ports are finalized. - dp.outputManager.finalizeOutput() + dp.executor match { + case executor: LoopStartOpExec if executor.checkCondition() => + dp.outputManager.finalizeIteration(dp.actorId) + case _ => + dp.outputManager.finalizeOutput() + } } } EmptyReturn() diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala new file mode 100644 index 00000000000..80869dc17f0 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.worker.promisehandlers + +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest, EndIterationRequest} +import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import com.twitter.util.Future +import org.apache.texera.amber.core.tuple.FinalizePort +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.operator.loop.{LoopEndOpExec, LoopStartOpExec} + +trait EndIterationHandler { + this: DataProcessorRPCHandlerInitializer => + + override def endIteration( + request: EndIterationRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + dp.executor match { + case _: LoopEndOpExec => + workerInterface.nextIteration(EmptyRequest(), mkContext(request.worker)) + case _ => + } + val channelId = dp.inputManager.currentChannelId + val portId = dp.inputGateway.getChannel(channelId).getPortId + dp.inputManager.getPort(portId).completed = true + dp.inputManager.initBatch(channelId, Array.empty) + dp.processOnFinish() + + dp.outputManager.outputIterator.appendSpecialTupleToEnd( + FinalizePort(portId, input = true) + ) + + if (dp.inputManager.getAllPorts.forall(portId => dp.inputManager.isPortCompleted(portId))) { + // Need this check for handling input port dependency relationships. + // See documentation of isMissingOutputPort + if (!dp.outputManager.isMissingOutputPort) { + // assuming all the output ports finalize after all input ports are finalized. + dp.outputManager.finalizeIteration(request.worker) + } + } + EmptyReturn() + } +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala new file mode 100644 index 00000000000..0bf1691ed5d --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.worker.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.core.tuple.FinalizeIteration +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import org.apache.texera.amber.operator.loop.LoopStartOpExec + +trait NextIterationHandler { + this: DataProcessorRPCHandlerInitializer => + + override def nextIteration( + request: EmptyRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + dp.processOnFinish() + dp.outputManager.finalizeIteration(dp.actorId) + EmptyReturn() + } +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala index 01cbb858bd7..8a3d431ea5f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala @@ -38,7 +38,7 @@ trait StartChannelHandler { ctx: AsyncRPCContext ): Future[EmptyReturn] = { val portId = dp.inputGateway.getChannel(dp.inputManager.currentChannelId).getPortId - dp.sendECMToDataChannels(METHOD_START_CHANNEL, NO_ALIGNMENT) + dp.sendECMToDataChannels(METHOD_START_CHANNEL.getBareMethodName, NO_ALIGNMENT) try { val outputState = dp.executor.produceStateOnStart(portId.id) if (outputState.isDefined) { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala index dc074c1094d..488b5e02050 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala @@ -98,6 +98,7 @@ object Utils extends LazyLogging { case WorkflowAggregatedState.PAUSING => "Pausing" case WorkflowAggregatedState.PAUSED => "Paused" case WorkflowAggregatedState.RESUMING => "Resuming" + case WorkflowAggregatedState.ITERATION_COMPLETED => "IterationCompleted" case WorkflowAggregatedState.COMPLETED => "Completed" case WorkflowAggregatedState.TERMINATED => "Terminated" case WorkflowAggregatedState.FAILED => "Failed" @@ -117,6 +118,7 @@ object Utils extends LazyLogging { case "pausing" => WorkflowAggregatedState.PAUSING case "paused" => WorkflowAggregatedState.PAUSED case "resuming" => WorkflowAggregatedState.RESUMING + case "iterationcompleted" => WorkflowAggregatedState.ITERATION_COMPLETED case "completed" => WorkflowAggregatedState.COMPLETED case "failed" => WorkflowAggregatedState.FAILED case "killed" => WorkflowAggregatedState.KILLED diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 85a62b77a3b..a158eb87530 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -52,6 +52,9 @@ storage { runtime-statistics-namespace = "workflow-runtime-statistics" runtime-statistics-namespace = ${?STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE} + ecm-namespace = "ecm" + ecm-namespace = ${?STORAGE_ICEBERG_TABLE_ECM_NAMESPACE} + commit { batch-size = 4096 # decide the buffer size of our IcebergTableWriter batch-size = ${?STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE} diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index c5bd3302862..8a1aba73b76 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -63,6 +63,7 @@ object StorageConfig { conf.getInt("storage.iceberg.table.commit.retry.min-wait-ms") val icebergTableCommitMaxRetryWaitMs: Int = conf.getInt("storage.iceberg.table.commit.retry.max-wait-ms") + val icebergTableECMNamespace: String = conf.getString("storage.iceberg.table.ecm-namespace") // LakeFS specifics // lakefsEndpoint is a var because in test we need to override it to point to the test container @@ -116,6 +117,7 @@ object StorageConfig { val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = "STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES" val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = "STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS" val ENV_ICEBERG_TABLE_COMMIT_MAX_WAIT_MS = "STORAGE_ICEBERG_TABLE_COMMIT_MAX_WAIT_MS" + val ENV_ICEBERG_TABLE_ECM_NAMESPACE = "STORAGE_ICEBERG_TABLE_ECM_NAMESPACE" // LakeFS val ENV_LAKEFS_ENDPOINT = "STORAGE_LAKEFS_ENDPOINT" diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala index f99739acc04..b9404d839e3 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/executor/OperatorExecutor.scala @@ -56,4 +56,9 @@ trait OperatorExecutor { def close(): Unit = {} + def reset(): Unit = { + close() + open() + } + } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 4c37c33bb20..5b27170560f 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -72,6 +72,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case ECM => StorageConfig.icebergTableECMNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } @@ -126,6 +127,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case ECM => StorageConfig.icebergTableECMNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala index 3513ac5ecd8..f4f3a9e4bf1 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala @@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration { val RESULT: Value = Value("result") val RUNTIME_STATISTICS: Value = Value("runtimeStatistics") val CONSOLE_MESSAGES: Value = Value("consoleMessages") + val ECM: Value = Value("ecm") } object VFSURIFactory { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala index ade33283f7f..a3978d0c714 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala @@ -39,4 +39,8 @@ object ResultSchema { val consoleMessagesSchema: Schema = new Schema( new Attribute("message", AttributeType.STRING) ) + + val ecmSchema: Schema = new Schema( + new Attribute("workerId", AttributeType.STRING) + ) } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/TupleLike.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/TupleLike.scala index 040b94977a1..cbdf8591bde 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/TupleLike.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/TupleLike.scala @@ -19,6 +19,7 @@ package org.apache.texera.amber.core.tuple +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.core.workflow.PortIdentity import scala.jdk.CollectionConverters.CollectionHasAsScala @@ -41,6 +42,7 @@ trait InternalMarker extends TupleLike { final case class FinalizePort(portId: PortIdentity, input: Boolean) extends InternalMarker final case class FinalizeExecutor() extends InternalMarker +final case class FinalizeIteration(portId: PortIdentity, worker: ActorVirtualIdentity) extends InternalMarker trait SeqTupleLike extends TupleLike with SchemaEnforceable { override def inMemSize: Long = ??? diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index eb319a82d1d..d761341f76e 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -49,6 +49,7 @@ import org.apache.texera.amber.operator.huggingFace.{ HuggingFaceTextSummarizationOpDesc } import org.apache.texera.amber.operator.ifStatement.IfOpDesc +import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.texera.amber.operator.intersect.IntersectOpDesc import org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpDesc import org.apache.texera.amber.operator.keywordSearch.KeywordSearchOpDesc @@ -202,6 +203,8 @@ trait StateTransferFunc new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 00000000000..f56068e9036 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopEndOpDesc extends LogicalOp { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName("org.apache.texera.amber.operator.loop.LoopEndOpExec") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Loop End", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala new file mode 100644 index 00000000000..60f18cd5fc9 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala @@ -0,0 +1,8 @@ +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} + +class LoopEndOpExec extends OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 00000000000..d0c30256f33 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +class LoopStartOpDesc extends LogicalOp { + + @JsonProperty(required = true) + @JsonSchemaTitle("Iteration Number") + var iteration: Int = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.loop.LoopStartOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Loop Start", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala new file mode 100644 index 00000000000..68c5bcc0e3e --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +import scala.collection.mutable + +class LoopStartOpExec(descString: String) extends OperatorExecutor { + private val desc: LoopStartOpDesc = objectMapper.readValue(descString, classOf[LoopStartOpDesc]) + private val data = new mutable.ArrayBuffer[Tuple] + private var currentIteration = 0 + + def checkCondition(): Boolean = { + desc.iteration > currentIteration + } + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + data.append(tuple) + Iterator.empty + } + + override def onFinish(port: Int): Iterator[TupleLike] = { + currentIteration += 1 + data.iterator + } + +} diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index b23f92caf32..08bc601c4a7 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -384,6 +384,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy const colorMap: Record = { ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)", ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)", + IterationCompleted: "rgba(156,39,176,0.2)", Completed: "rgba(76,175,80,0.2)", }; this.paper.getModelById("region-" + region.id).attr("body/fill", colorMap[region.state]); diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index 77458947cd6..f54068ad636 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -401,6 +401,9 @@ export class JointUIService { case OperatorState.Ready: fillColor = "#a6bd37"; break; + case OperatorState.IterationCompleted: + fillColor = "#9C27B0"; + break; case OperatorState.Completed: fillColor = "green"; break; diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts b/frontend/src/app/workspace/types/execute-workflow.interface.ts index 23ade231998..0033ac39c33 100644 --- a/frontend/src/app/workspace/types/execute-workflow.interface.ts +++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts @@ -74,6 +74,7 @@ export enum OperatorState { Pausing = "Pausing", Paused = "Paused", Resuming = "Resuming", + IterationCompleted = "IterationCompleted", Completed = "Completed", Recovering = "Recovering", } diff --git a/frontend/src/assets/operator_images/LoopEnd.png b/frontend/src/assets/operator_images/LoopEnd.png new file mode 100644 index 00000000000..ee0f9ab6fac Binary files /dev/null and b/frontend/src/assets/operator_images/LoopEnd.png differ diff --git a/frontend/src/assets/operator_images/LoopStart.png b/frontend/src/assets/operator_images/LoopStart.png new file mode 100644 index 00000000000..7e5be023cdf Binary files /dev/null and b/frontend/src/assets/operator_images/LoopStart.png differ