Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ message ControlRequest {
PortCompletedRequest portCompletedRequest = 9;
WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
LinkWorkersRequest linkWorkersRequest = 11;
IterationCompletedRequest iterationCompletedRequest = 12;

// request for worker
AddInputChannelRequest addInputChannelRequest = 50;
Expand All @@ -57,6 +58,7 @@ message ControlRequest {
EmptyRequest emptyRequest = 56;
PrepareCheckpointRequest prepareCheckpointRequest = 57;
QueryStatisticsRequest queryStatisticsRequest = 58;
EndIterationRequest endIterationRequest = 59;

// request for testing
Ping ping = 100;
Expand Down Expand Up @@ -168,6 +170,11 @@ message PortCompletedRequest {
bool input = 2;
}

// Notify controller that an output port has finished one iteration (used by loop operators).
message IterationCompletedRequest {
core.PortIdentity portId = 1 [(scalapb.field).no_box = true];
}

message WorkerStateUpdatedRequest {
worker.WorkerState state = 1 [(scalapb.field).no_box = true];
}
Expand Down Expand Up @@ -271,4 +278,8 @@ message PrepareCheckpointRequest{

message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
}

message EndIterationRequest{
core.ActorVirtualIdentity worker = 1 [(scalapb.field).no_box = true];
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ service ControllerService {
rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatePythonExpressionResponse);
rpc ConsoleMessageTriggered(ConsoleMessageTriggeredRequest) returns (EmptyReturn);
rpc PortCompleted(PortCompletedRequest) returns (EmptyReturn);
rpc IterationCompleted(IterationCompletedRequest) returns (EmptyReturn);
rpc StartWorkflow(EmptyRequest) returns (StartWorkflowResponse);
rpc ResumeWorkflow(EmptyRequest) returns (EmptyReturn);
rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ enum WorkflowAggregatedState {
PAUSED = 4;
RESUMING = 5;
COMPLETED = 6;
ITERATION_COMPLETED = 11;
FAILED = 7;
UNKNOWN = 8;
KILLED = 9;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ service WorkerService {
rpc EndWorker(EmptyRequest) returns (EmptyReturn);
rpc StartChannel(EmptyRequest) returns (EmptyReturn);
rpc EndChannel(EmptyRequest) returns (EmptyReturn);
rpc EndIteration(EndIterationRequest) returns (EmptyReturn);
rpc NextIteration(EmptyRequest) returns (EmptyReturn);
rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue);
rpc NoOperation(EmptyRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ControllerAsyncRPCHandlerInitializer(
with ResumeHandler
with StartWorkflowHandler
with PortCompletedHandler
with IterationCompletedHandler
with ConsoleMessageHandler
with RetryWorkflowHandler
with EvaluatePythonExpressionHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class WorkflowScheduler(
this.physicalPlan = updatedPhysicalPlan
}

def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next()
def getNextRegions: Set[Region] = {
val region : Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext()
println("current Region: " + region)
region
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ case class RegionExecution(region: Region) {
physicalOpId: PhysicalOpIdentity,
inheritOperatorExecution: Option[OperatorExecution] = None
): OperatorExecution = {
assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution already exists.")

operatorExecutions.getOrElseUpdate(
physicalOpId,
inheritOperatorExecution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ case class WorkflowExecution() {
* @throws AssertionError if the `RegionExecution` has already been initialized.
*/
def initRegionExecution(region: Region): RegionExecution = {
// ensure the region execution hasn't been initialized already.
assert(
!regionExecutions.contains(region.id),
s"RegionExecution of ${region.id} already initialized."
)
regionExecutions.getOrElseUpdate(region.id, RegionExecution(region))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.engine.architecture.controller.promisehandlers

import com.twitter.util.Future
import org.apache.texera.amber.core.WorkflowRuntimeException
import org.apache.texera.amber.core.workflow.GlobalPortIdentity
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
FatalError
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
IterationCompletedRequest,
QueryStatisticsRequest
}
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
import org.apache.texera.amber.util.VirtualIdentityUtils

/** Notify controller that a worker has completed an iteration on an output port.
*
* This is different from [[PortCompletedHandler]]: a port can have multiple iterations
* (e.g., loop execution) before the whole port is fully completed.
*/
trait IterationCompletedHandler {
this: ControllerAsyncRPCHandlerInitializer =>

override def iterationCompleted(
msg: IterationCompletedRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
controllerInterface
.controllerInitiateQueryStatistics(QueryStatisticsRequest(scala.Seq(ctx.sender)), CONTROLLER)
.map { _ =>
val globalPortId = GlobalPortIdentity(
VirtualIdentityUtils.getPhysicalOpId(ctx.sender),
msg.portId
)

cp.workflowExecutionCoordinator.getRegionOfPortId(globalPortId) match {
case Some(region) =>
// Emit UI-only IterationCompleted phase for this region.
cp.workflowExecutionCoordinator.markRegionIterationCompletedIfNeeded(region)

// Keep scheduler running
cp.workflowExecutionCoordinator
.coordinateRegionExecutors(cp.actorService)
.onFailure {
case err: WorkflowRuntimeException =>
sendToClient(FatalError(err, err.relatedWorkerId))
case other =>
sendToClient(FatalError(other, None))
}
case None =>
}

EmptyReturn()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,16 @@
package org.apache.texera.amber.engine.architecture.messaginglayer

import org.apache.texera.amber.core.state.State
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.{DocumentFactory, VFSResourceType}
import org.apache.texera.amber.core.storage.model.BufferedItemWriter
import org.apache.texera.amber.core.storage.result.ResultSchema
import org.apache.texera.amber.core.tuple._
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity}
import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{
DPOutputIterator,
getBatchSize,
toPartitioner
}
import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{DPOutputIterator, getBatchSize, toPartitioner}
import org.apache.texera.amber.engine.architecture.sendsemantics.partitioners._
import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
import org.apache.texera.amber.engine.architecture.worker.managers.{
OutputPortResultWriterThread,
PortStorageWriterTerminateSignal
}
import org.apache.texera.amber.engine.architecture.worker.managers.{OutputPortResultWriterThread, PortStorageWriterTerminateSignal}
import org.apache.texera.amber.engine.common.AmberLogging
import org.apache.texera.amber.util.VirtualIdentityUtils

Expand Down Expand Up @@ -124,6 +118,10 @@ class OutputManager(
: mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
mutable.HashMap()

val ECMWriterThreads
: mutable.HashMap[PortIdentity, BufferedItemWriter[Tuple]] =
mutable.HashMap()

/**
* Add down stream operator and its corresponding Partitioner.
*
Expand Down Expand Up @@ -245,7 +243,6 @@ class OutputManager(
writerThread.join()
case None =>
}

}

def getPort(portId: PortIdentity): WorkerPort = ports(portId)
Expand All @@ -260,6 +257,13 @@ class OutputManager(
outputIterator.appendSpecialTupleToEnd(FinalizeExecutor())
}

def finalizeIteration(worker: ActorVirtualIdentity): Unit = {
this.ports.keys
.foreach(outputPortId =>
outputIterator.appendSpecialTupleToEnd(FinalizeIteration(outputPortId, worker))
)
}

/**
* This method is only used for ensuring correct region execution. Some operators may have input port dependency
* relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionCoordinator`
Expand All @@ -280,12 +284,18 @@ class OutputManager(
}

private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = {
val bufferedItemWriter = DocumentFactory
this.ECMWriterThreads(portId) = DocumentFactory
.createDocument(storageUri.resolve("ecm"), ResultSchema.ecmSchema)
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.asInstanceOf[BufferedItemWriter[Tuple]]

val bufferedTupleWriter = DocumentFactory
.openDocument(storageUri)
._1
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.asInstanceOf[BufferedItemWriter[Tuple]]
val writerThread = new OutputPortResultWriterThread(bufferedItemWriter)

val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter)
this.outputPortResultWriterThreads(portId) = writerThread
writerThread.start()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class RegionExecutionCoordinator(
private case object Unexecuted extends RegionExecutionPhase
private case object ExecutingDependeePortsPhase extends RegionExecutionPhase
private case object ExecutingNonDependeePortsPhase extends RegionExecutionPhase
private case object IterationCompleted extends RegionExecutionPhase
private case object Completed extends RegionExecutionPhase

private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new AtomicReference(
Expand Down Expand Up @@ -190,7 +191,8 @@ class RegionExecutionCoordinator(
}
}

def isCompleted: Boolean = currentPhaseRef.get == Completed
// Treat IterationCompleted as not completed from scheduler perspective.
def isCompleted: Boolean = currentPhaseRef.get == Completed || currentPhaseRef.get == IterationCompleted

/**
* This will sync and transition the region execution phase from one to another depending on its current phase:
Expand Down Expand Up @@ -219,6 +221,9 @@ class RegionExecutionCoordinator(
}
case ExecutingNonDependeePortsPhase =>
tryCompleteRegionExecution()
case IterationCompleted =>
// IterationCompleted is a UI/observability phase; scheduling doesn't advance on it.
Future.Unit
case Completed =>
// Already completed, no further action needed.
Future.Unit
Expand Down Expand Up @@ -543,6 +548,13 @@ class RegionExecutionCoordinator(
}
}

/** Emit IterationCompleted region phase to frontend. This does not affect scheduling semantics. */
def setIterationCompletedPhase(): Unit = {
if (currentPhaseRef.get != Completed) {
setPhase(IterationCompleted)
}
}

private def setPhase(phase: RegionExecutionPhase): Unit = {
currentPhaseRef.set(phase)
SessionState.getAllSessionStates.foreach { state =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,22 @@ package org.apache.texera.amber.engine.architecture.scheduling

case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] {
private var currentLevel = levelSets.keys.minOption.getOrElse(0)

private var loopStartLevel = currentLevel
def getRegions: List[Region] = levelSets.values.flatten.toList

override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel)

override def next(): Set[Region] = {
val regions = levelSets(currentLevel)
if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-")))) loopStartLevel = currentLevel
currentLevel += 1
regions
}

def loopNext(): Set[Region] = {
val regions = levelSets(currentLevel)
if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))) currentLevel = loopStartLevel
else currentLevel += 1
regions
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.texera.amber.engine.architecture.common.{
AkkaActorService
}
import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
import org.apache.texera.amber.engine.architecture.controller.execution.{OperatorExecution, RegionExecution, WorkflowExecution}
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient

import scala.collection.mutable
Expand Down Expand Up @@ -102,6 +103,27 @@ class WorkflowExecutionCoordinator(
.unit
}

/**
* Mark the given operator as iteration-completed.
*
* This is a UI/observability state only; it doesn't change the worker-level state machine.
*/
def markOperatorIterationCompleted(operatorExecution: OperatorExecution): Unit = {
// No-op placeholder: operator aggregated state is computed from WorkerState.
// IterationCompleted is currently an observability/UI state driven by region phase events.
()
}

/**
* If all operators in the region have reached ITERATION_COMPLETED (or COMPLETED), mark the region as
* ITERATION_COMPLETED (region is still considered not completed, so scheduling remains unchanged).
*/
def markRegionIterationCompletedIfNeeded(region: Region): Unit = {
// Best-effort: tell region coordinator to emit an IterationCompleted phase event.
// This doesn't affect scheduling/completion semantics.
regionExecutionCoordinators.get(region.id).foreach(_.setIterationCompletedPhase())
}

def getRegionOfLink(link: PhysicalLink): Region = {
getExecutingRegions.find(region => region.getLinks.contains(link)).get
}
Expand Down
Loading
Loading