From fae4293eee4da118153ea4108708c2bcb72eb5a9 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 8 Nov 2025 21:00:57 -0800 Subject: [PATCH 01/15] init --- .../messaginglayer/OutputManager.scala | 4 ++ .../core/executor/OperatorExecutor.scala | 5 ++ .../apache/amber/core/tuple/TupleLike.scala | 2 + .../org/apache/amber/operator/LogicalOp.scala | 20 ++---- .../amber/operator/loop/LoopEndOpDesc.scala | 53 ++++++++++++++ .../amber/operator/loop/LoopEndOpExec.scala | 8 +++ .../amber/operator/loop/LoopStartOpDesc.scala | 66 ++++++++++++++++++ .../amber/operator/loop/LoopStartOpExec.scala | 47 +++++++++++++ .../src/assets/operator_images/LoopEnd.png | Bin 0 -> 5865 bytes .../src/assets/operator_images/LoopStart.png | Bin 0 -> 2138 bytes 10 files changed, 191 insertions(+), 14 deletions(-) create mode 100644 common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala create mode 100644 common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala create mode 100644 common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala create mode 100644 common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala create mode 100644 frontend/src/assets/operator_images/LoopEnd.png create mode 100644 frontend/src/assets/operator_images/LoopStart.png diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala index e786ea432eb..ac8764c7beb 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -260,6 +260,10 @@ class OutputManager( outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) } + def finalizeIteration(worker: ActorVirtualIdentity): Unit = { + outputIterator.appendSpecialTupleToEnd(FinalizeIteration(worker)) + } + /** * This method is only used for ensuring correct region execution. Some operators may have input port dependency * relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionCoordinator` diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala index 69e62a8f308..c4839b584d6 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala @@ -56,4 +56,9 @@ trait OperatorExecutor { def close(): Unit = {} + def reset(): Unit = { + close() + open() + } + } diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala index a96da69d407..8e789fc2ccd 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala @@ -19,6 +19,7 @@ package org.apache.amber.core.tuple +import org.apache.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.amber.core.workflow.PortIdentity import scala.jdk.CollectionConverters.CollectionHasAsScala @@ -41,6 +42,7 @@ trait InternalMarker extends TupleLike { final case class FinalizePort(portId: PortIdentity, input: Boolean) extends InternalMarker final case class FinalizeExecutor() extends InternalMarker +final case class FinalizeIteration(worker: ActorVirtualIdentity) extends InternalMarker trait SeqTupleLike extends TupleLike with SchemaEnforceable { override def inMemSize: Long = ??? diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala index 6fb27d92c3f..05cf952610a 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala @@ -35,22 +35,15 @@ import org.apache.amber.operator.distinct.DistinctOpDesc import org.apache.amber.operator.dummy.DummyOpDesc import org.apache.amber.operator.filter.SpecializedFilterOpDesc import org.apache.amber.operator.hashJoin.HashJoinOpDesc -import org.apache.amber.operator.huggingFace.{ - HuggingFaceIrisLogisticRegressionOpDesc, - HuggingFaceSentimentAnalysisOpDesc, - HuggingFaceSpamSMSDetectionOpDesc, - HuggingFaceTextSummarizationOpDesc -} +import org.apache.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc} import org.apache.amber.operator.ifStatement.IfOpDesc import org.apache.amber.operator.intersect.IntersectOpDesc import org.apache.amber.operator.intervalJoin.IntervalJoinOpDesc import org.apache.amber.operator.keywordSearch.KeywordSearchOpDesc import org.apache.amber.operator.limit.LimitOpDesc +import org.apache.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc -import org.apache.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{ - SklearnAdvancedKNNClassifierTrainerOpDesc, - SklearnAdvancedKNNRegressorTrainerOpDesc -} +import org.apache.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc, SklearnAdvancedKNNRegressorTrainerOpDesc} import org.apache.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc import org.apache.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc import org.apache.amber.operator.metadata.{OPVersion, OperatorInfo, PropertyNameConstants} @@ -64,10 +57,7 @@ import org.apache.amber.operator.sleep.SleepOpDesc import org.apache.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc} import org.apache.amber.operator.sortPartitions.SortPartitionsOpDesc import org.apache.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc -import org.apache.amber.operator.source.apis.twitter.v2.{ - TwitterFullArchiveSearchSourceOpDesc, - TwitterSearchSourceOpDesc -} +import org.apache.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc} import org.apache.amber.operator.source.fetcher.URLFetcherOpDesc import org.apache.amber.operator.source.scan.FileScanSourceOpDesc import org.apache.amber.operator.source.scan.arrow.ArrowSourceOpDesc @@ -195,6 +185,8 @@ trait StateTransferFunc new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"), diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 00000000000..8ebe91483c3 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.operator.loop + +import org.apache.amber.core.executor.OpExecWithClassName +import org.apache.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.amber.operator.LogicalOp +import org.apache.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopEndOpDesc extends LogicalOp { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName("edu.uci.ics.amber.operator.loop.LoopEndOpExec") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Loop End", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} \ No newline at end of file diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala new file mode 100644 index 00000000000..a98081f1814 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala @@ -0,0 +1,8 @@ +package org.apache.amber.operator.loop + +import org.apache.amber.core.executor.OperatorExecutor +import org.apache.amber.core.tuple.{Tuple, TupleLike} + +class LoopEndOpExec extends OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) +} \ No newline at end of file diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 00000000000..67d7a503c30 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.amber.core.executor.OpExecWithClassName +import org.apache.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.amber.operator.LogicalOp +import org.apache.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.amber.util.JSONUtils.objectMapper + +class LoopStartOpDesc extends LogicalOp { + + @JsonProperty(required = true) + @JsonSchemaTitle("Iteration Number") + var iteration: Int = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "edu.uci.ics.amber.operator.loop.LoopStartOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Loop Start", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + +} \ No newline at end of file diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala new file mode 100644 index 00000000000..cd9698701ed --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.operator.loop + +import org.apache.amber.core.executor.OperatorExecutor +import org.apache.amber.core.tuple.{Tuple, TupleLike} +import org.apache.amber.util.JSONUtils.objectMapper + +import scala.collection.mutable + +class LoopStartOpExec(descString: String) extends OperatorExecutor { + private val desc: LoopStartOpDesc = objectMapper.readValue(descString, classOf[LoopStartOpDesc]) + private val data = new mutable.ArrayBuffer[Tuple] + private var currentIteration = 0 + + def checkCondition(): Boolean = { + desc.iteration > currentIteration + } + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + data.append(tuple) + Iterator.empty + } + + override def onFinish(port: Int): Iterator[TupleLike] = { + currentIteration += 1 + data.iterator + } + +} \ No newline at end of file diff --git a/frontend/src/assets/operator_images/LoopEnd.png b/frontend/src/assets/operator_images/LoopEnd.png new file mode 100644 index 0000000000000000000000000000000000000000..ee0f9ab6faccd328c102f214a7547d3e34d8359b GIT binary patch literal 5865 zcmVPy0qe(P?J^Czx6vRgiNy(U0Jp~gW+wJeo>FF6J zv)#V8`#arz@9BT?$35r!zVkc3^qfB5Mles!2yn63hL)Czs8n_X^S=Vv1(x+u03U(x zw}GeyL>8ERFmHopZ36HFlo}CsI${gOizOdGK38`k{#z@ z0LKG37R*P3=tuxoiu(3&tdALqidki^V-aG#x}8IXROixtAl!S_|ebFwduQ`ESz7 zbjWWA!Ukv4O65`j7wZ5mr#j}LP?(eVvu>^l%f;g1u&m1fd@ZLWO2h2?;QMz`x%|g8 zl*iSXOTu!ocp!+b29XmNfTo?#?1K3=DwqG9Ck%2$Se`Ls0yb^B0l<}c0y*Qh-j24m zSu}U<#*Et;n3o&Evg2F~=9?0Ai{@d6bl9le!$dcd>(0}mE{=}O6P9iJSP*>=z$tMS zYUV~lU>TUNAtNe;3TxWfHkYHj3yLDwV#I#U*Bvu(0mQ2kYVYyhm7M68OmOyH1TYUdIDwl7~3I>@WY#2Q?4@75X1sr2X zhye5BAi6jU(P5b&EIZC=0J;F|X-JY;1XL(v2f6Og)3h*k!m{HiPt~1if?#ylguvzG zx_8wyBf-I`5|(XyE{HBka6Lwp4g{F*AgLzA@=ZJefa|EJsK{hR*vTAn) zvr1r8XZ5{7q#gf6nX9h83WX`u)%BM|<&GgtD_}u%716^WQrV|@dR`3@0)bb+yaLQ` zpj=)>ij;LetjJs}?hm4aK{Oe_VPWFqp>>K(XBzV=FrTL@`b!&O*>R2s^P@S51$YU- zQ}F#~FmBwlG=KiPy4b+er|*m{TTX;!oebd906w8hcinpWViC?E*L}Wj84)LHA#CWm zQg?Z~M6@f7VN0Oy8(UkuY5x44Gz=I?Cr_U~5nH#aq)Bx@eO6ZN8fFKxDiCs}v}a75 zu;H8aiHwrl6>_cRw#T%;`Ts|C%qXH zCfrK1XKzk@12q}IGiUCAjT^58aD4{udArGVznJ9G6DMpY+^t0SgXji{KS@b~FDWZH zk}=N$^H(MHRF$2u8#htHW+0G7m8-mp%H>C^0@)-3xL7;~mi2uAs$6JVPhtTxi4iso z5qu&oM0(wxcVokb%jltpDk<8KN!5J*`33B>lZvqVW{L)?>#o?LiHQ(42)+hhPAQ(X zM|~kvSJxvcZbmLtb4SOy!A(gzw+Skq_L#Uq;K>lywwI)mDZsoGt*z6uM_=89)n4R+c?}AMbE&KA_w_8)%o!a4?&vrQmC7R^+D98s zbr$QWi?Ct&pck|)4p^x|>r59FImE**I!md%5|zs7v~b~@ za@%DjD+KtWiw;1cpzhLjIT@Cb>#892$WP6L4XOwN_e9nZXP`O;5Piu3M+uYzy z%ls>i$Ae31shU_5VZ$h_H#Nn#z6;Ab)tp&tBU}=rgEOt~KdI$IOvTTRAAdm9sIJ-w z>o`BqR!o@aG;yXu5>9Cha^-C^)lf%q~72=*S-=8>I zZ``p7Yuh*UW?tSTiiYoh$>d;(BAlr3Fh}fXi3%MmjQK|LyjdgZ7@M$;vqrQlHqrf5 zDw#g9qPw5GaBgqEPdFQQExGQ9kz=D0mK|pbfJMnKHT^Y zMPKzBD26Fh$#oYGoj(>~Ws6}j&mzyeF~)+-)D#h5+g5Ed5Pn*R{b*Koqh_IllnmX_TcsXlA7Gd8GCyKUP#v7*XDUU0(+ zYunF)=me3CV4g#sH&aBm5tOzqzWo0|1dvCo_Auu;Oz(-*2q$45}AkS$UK*6 zmjNaJQiJmKeQlwnp1t~Oa&1po+r9`yWg)Fklk5JwkZdD1Wdzu9{#&e32y>ACT^(h7uwg|uzS-Z)C6@fKtLWQ$zZF1f6 zV8Y5}7PPeNX>zr)Mz7@U33IlpahXP?$S!WN5%!?I-R4*32VR4%94T_&r0 z+mLo~v3NQxt6OAos%}9@*b8A<|0SZNrDYcrJ0OB=CWVHv1@DL$=llQBM_9*E318O> zD0!7!_b>suMr4K&V8?j{z@Y*r-rPr6+kOy4QxjLfd@p(4MTyHb0yB*O+x9{bjak8^ zt_3hJ>L=_=AgZfL!kCIpW3O=k>X`?T{}k@8&eZRfE`DLs;ZFHL?1=i%K+S&sMJ1Trbw{> zsCf}7Rs2{T39f}@-68;liP91PH7_EiqPEm6flSe2Q=Kp`+!C=%5_I;46G)W3 zyop>_$u>_DA|S;p^M-9gGU-^d+#A-MVRh_Z)9H{q!gG@pR>g2Kg~n&eLUL3G%~&99b$COJLi_g6M_Bu`tnEi9?zvfpXDW zCLT6VJjjKuvhlDc;~*C#$di$x83zgowr!Ksl_y4I+*ZLD>-8pUHshP0NAq-K&6>Id zd1OXLZI-V~U@E_4bnEkJmVD(GIp+ajYHQ__b#+>;r?!@zzhRSXiMBdJu9X|E(^v=Lkl0d8+U0?dD? zHZI2Rdy(q~9TvCi<{jS0J`17~VhA67D)SujyqO}hjnLE(VB4MzqDw^#V1AA~@5FxD z!wD-_r$Fi4jFy(&P2Vn&txZK}7(1{Izz!k?`2KZNF5fyjVW&;|7%G)lMU*R{Lktkp zV+3nKK(Lr#pZ$fxp|oJZYoii2JQhf~X_fdMluCP<-UnLvB&sCH-D+=F-#imV6on0u z2g@FIBnSvwEM5i6x=qB0KmlJ#o_B|sd?Q#30%Dm3{nq>b)l@EjKMG+d!Lt4$xaxJ- zamRgW_Uz4qI*e#R1bF7m9q`_JZ}nCU+FeYI@9#(D^6ILD4PJtdvlPG?V#b*Iv0~8b z5G+-laFFfY80wY`R)Y=F;W5xu0@PTs4?G{CN%rWo6 zvQDM0E_K*9k46NzqvI&}{*wT97FWx>9^=O!Ky&AAjH0V1!iH~IslHyr?PXpAqR)}* zz7b_HW=wnp*l`X9^RGZOqH8q1hG7yeA=jN3Rc_6M4N3bHfK#F>i8*d1DwWe|;lej# z$}!WcAQ1K@ctYHh^`RZh$aOzm%f{44SgBS?qg$>%_RcbI+FG`@wz0vR6~)Q=Oh5nL{*m9 zICD>%2$WjyS+}?NWOCi#)w76Lgbm)ZvULo`T9fgJ)wAQ_9Bm$~p*F@$^N&r~@My3E zMEYX@n3tlpb-D?diO$?0kY(%E`C4KFhML9vICw4S@MVZ&u6b746A zVWz-c0KS>l<^VXITsO9>H!;G7q`eHlohgE{625+>ast=4dB5}fo zZ(7+R98uP$>K*_$W5R@6o2kM~P?2Th#%lpwFI+bu$}Wsp3?s_K(ZWvu@e)4RuqJ(%1h8UwOO=g~Defj$~ql4M?kU!H9mTDSkF8 z-#3GK0W8bSbNTw9OuO$pAetstOgJi{gU6}(R_4!F%gw4sw3;JI*zi1iJeZ#V(Ju8G zDLC(G06)Q|O^fJ(2mT?bClS$MYoVzCz5?Kri3m+n2=hB2QrB0{Cn7BXQ5~6jL?%9%S$9hqCjpRS?!#EGZ;7SS)FIz@JxClEbC+dD!BO*Q8zQSaq1XAbdKbkwSR5e2s`LZ`*4On z#G>x;t6^K8SHQdi%x|DvUKQuoA^55<7WW6y!62Fp;4ly!3E%&v|=H`>}R zNcx_&n%Yzd+iw)xcKUAIY6zL%ha-@%yR{aGQzdNpF_w0~Of7hsu|Q&hEcJkyI$=Z7 zs?bDLnkbEU+Inzn#yKjUwIhiT!8qV$g0LZJ_XP92R5BOhAlM9Z`|i*$jvIWP1XP(J z?7+D;y&SP5@N7I8F>|nz1|)NY4d1rPGxo#WUwtLLTRP>bD=VUJNqeqpT~a0qJMgyE zU96*4^cr=1okcEmz>#Uf25;NGvPj?4Sj$TX?pf8b49wTmT0ThUVlq$Ifw!&WT-@t9 zx+y2I2s-yLzOKGlfE#P6A{bw3eG+m**x+s3=OMd6TT$iuFwNVn+tJoGOSi|WUXXG{ z*g-c52ZHEoO+6Rt#WvgXm|Za67NtykmKKsr!VV(t;jk>_DKq_QqqFKh`2JnGqN}2t z6>Vg$2|GaCzUFr-r%6=@H_ylj%tN6thZZb&EzbZo1YrlgarXexbP#m};d|y;3+65` z&mXOKYu+GgD8dc~pB-lkfUh@c2xB*Zd&zYd=Z%on>}^=W4tncO0x%UsUv6m6net3A z{}jL?DwkJR+xWcx-tdIo?yalNx?8uN4I-7)cS2q_ZNP5k=fK>J*4D@Jem0G|7fnUj zAx8!k9rjreoeJPnIXUyHl$_s!`AHD{It$TLQHhq+xTYoSPyoX&D#rsj7R*P734vBp zK-VG14-y}VmIF|oR-QLqbTkywsAHotF@-xjEv#PsaR8HHS%-jGm8(=`m-bE6J*?cK zz+1f^h^s)P%7wlTUk`V)0_`k4SDLq_~R%4M5l?=V1$00000NkvXXu0mjf{#zRE literal 0 HcmV?d00001 diff --git a/frontend/src/assets/operator_images/LoopStart.png b/frontend/src/assets/operator_images/LoopStart.png new file mode 100644 index 0000000000000000000000000000000000000000..7e5be023cdf6b64dd1bc140e5d75980455afeefb GIT binary patch literal 2138 zcmV-g2&MOlP)Px-6iGxuRCr$Pok@1vFbsy}TQ#e+JKw6wt+cDNN^a$bl4voq27m;>LBHSYqAkVY z$0rb+#7kG<@A`TD-M{p=3agGPM=EfMd@k!_*RSip?%sX$Fa2BAilA^a1?mO0>wb6l z?j5k2v68i*x_G;KNw`$Lu656Tc9Eo zuwW)j8SQ}zShIknU1*!20#9EcQB z6cz^>1a(cxTka+6Y@h~k>Vdf?xiG9J0X0-|fOc49G29qr3Tieum%s|bdO}cVHSgWK zRd`?mYB7L@zzV^7YEZqNBe1F-Ru$AD0sZM+llKS!)Id<7mwzo{Qv-F_Gg`;Ajf=V5 zcEPBP*Z_3}D+OT1L9Nzsk<(YzwwE$XlA1;kSW!?b@}lb|h!7T_9?%@2san@xK`cu5 z4NPGHDnVMNrYfLLkh*xZogYJ3fJ!!d(!e#T>u=Ub0F_K1J7MHlAR1IaI-00i9oPgi z9*BT73e;we!|nF0U1^@ zs7;RU1&&!a`tnAT$9|9)ZqY&gy58B9DER@RKu1jgs7DGB{-0nA5~$U$I4!jz2t$w; zqkgJLlynDUghNo9^B97VVtR@~AU#e0tKZ~t7t{tJ75|b-a~jdPl25tgMD{_gXRF?k zXh|dcz09jVg*qRav^@^AC+J29unB4*p2DAq9F9D=pW8RejG4((AV;*f{{Mr`-7oK{ z+EsgPgPL!YKX^Y6rKgkdgTv61*9U7K6zNnkYM3epFlN!gQDWcT~H&}?8PnHV1_x_cm~zVnY^)I>;EKn0{3-V~!vN)Oxq z1Xa7+7l8B!s6zQ=uyYedn!q}B{|`_Nx_vRvhYQil;ar^?Yf$$#9B^gF6_Z@Wbki%x ztAm4^J*sB6k1kr5l+3wnhXBsW4WL>Slvzs@udINoSG{+mM^%k8;7yegDI0JR1JDygFuPXM(N0BWhGCaCVhT#aw=V=z?de(toRjZVSEj!2b;lCwiOAI&NI40$xA|c}btX{twu`E7^XstQe;!h`;?wV2 z%Jo4-_kRvhHM@QHbw`UcQ|SIrz^bA*?VPi-I)#csRl~5VdQ*UDgxEX`@Jqdlp9+)We4}*#tzgGR)hetLR`( zU|H*=iJ9@^;87n?j-wUVwYy&yI3H%)=vntJ?akv=w?Xq4_}$+EM50(-LrH< z#{4!zlKpGYwa)zOG_vem6=HDBqaRts+|g(kR99Km4F_4&$W*8>sEl|tQ-hhs9_Vun z>gTnz5P+#bA;|?nZ4OexEW{zGgI7fekWRZ0s4d!pa$Jc7Dqv0LRAQSWb7U`2X@e|? zo0Q0)PVs8qDJuir)QU*(sEv*6XNeh_us{W@2^ces5|+~3$lf=OsRtia46fp6Wg4dG za4SL!HmH+;m@# zDsgnbrDHOvO~+D)^f3}JKy6IZ+tl4B?Q;+TwJ~8C^f8h$L2X1+(!N)UiejKPA}mF- zxHJ)4oya1?$hF7Y9dSk14)CI8Pk&?3lD+Fpmu$lmB4(g-(a!au8T+XF? z&3-<|(l-l&nh{o0IL%^I3J3n{a?L|445|^V{+yco^kdRID20W+G-h2trJ@70V|d`a zt^4IY5lN{uHH`wO6RauwE@t@doj)}B__y3}7UfZ?b`FmA)(Wr6Db&ar0hK8vsx4#E z73EQ58h~eJ1sL0ssI2 literal 0 HcmV?d00001 From 08e414d4df87cb7f3acccdca57500a45fa1c95df Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 8 Nov 2025 21:23:23 -0800 Subject: [PATCH 02/15] update --- .../architecture/rpc/controlcommands.proto | 5 ++ .../architecture/rpc/workerservice.proto | 2 + .../architecture/worker/DataProcessor.scala | 67 ++++++++++--------- .../DataProcessorRPCHandlerInitializer.scala | 2 + .../promisehandlers/EndChannelHandler.scala | 22 +++--- .../promisehandlers/EndIterationHandler.scala | 44 ++++++++++++ .../NextIterationHandler.scala | 43 ++++++++++++ 7 files changed, 141 insertions(+), 44 deletions(-) create mode 100644 amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala create mode 100644 amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala diff --git a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto index 41f0976314c..1bb392428fb 100644 --- a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto @@ -57,6 +57,7 @@ message ControlRequest { EmptyRequest emptyRequest = 56; PrepareCheckpointRequest prepareCheckpointRequest = 57; QueryStatisticsRequest queryStatisticsRequest = 58; + EndIterationRequest endIterationRequest = 59; // request for testing Ping ping = 100; @@ -271,4 +272,8 @@ message PrepareCheckpointRequest{ message QueryStatisticsRequest{ repeated core.ActorVirtualIdentity filterByWorkers = 1; +} + +message EndIterationRequest{ + core.ActorVirtualIdentity worker = 1 [(scalapb.field).no_box = true]; } \ No newline at end of file diff --git a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/workerservice.proto b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/workerservice.proto index 3b5b38ad7f7..9f335596190 100644 --- a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/workerservice.proto +++ b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/workerservice.proto @@ -47,6 +47,8 @@ service WorkerService { rpc EndWorker(EmptyRequest) returns (EmptyReturn); rpc StartChannel(EmptyRequest) returns (EmptyReturn); rpc EndChannel(EmptyRequest) returns (EmptyReturn); + rpc EndIteration(EndIterationRequest) returns (EmptyReturn); + rpc NextIteration(EmptyRequest) returns (EmptyReturn); rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn); rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue); rpc NoOperation(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessor.scala index e97bfe19ed8..c6e4f6273e2 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessor.scala @@ -24,36 +24,18 @@ import io.grpc.MethodDescriptor import org.apache.amber.core.executor.OperatorExecutor import org.apache.amber.core.state.State import org.apache.amber.core.tuple._ -import org.apache.amber.core.virtualidentity.{ - ActorVirtualIdentity, - ChannelIdentity, - EmbeddedControlMessageIdentity -} +import org.apache.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} import org.apache.amber.core.workflow.PortIdentity import org.apache.amber.engine.architecture.common.AmberProcessor import org.apache.amber.engine.architecture.logreplay.ReplayLogManager -import org.apache.amber.engine.architecture.messaginglayer.{ - InputManager, - OutputManager, - WorkerTimerService -} -import org.apache.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{ - NO_ALIGNMENT, - PORT_ALIGNMENT -} +import org.apache.amber.engine.architecture.messaginglayer.{InputManager, OutputManager, WorkerTimerService} +import org.apache.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{NO_ALIGNMENT, PORT_ALIGNMENT} import org.apache.amber.engine.architecture.rpc.controlcommands._ import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_END_CHANNEL -import org.apache.amber.engine.architecture.worker.WorkflowWorker.{ - DPInputQueueElement, - MainThreadDelegateMessage -} +import org.apache.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{METHOD_END_CHANNEL, METHOD_END_ITERATION} +import org.apache.amber.engine.architecture.worker.WorkflowWorker.{DPInputQueueElement, MainThreadDelegateMessage} import org.apache.amber.engine.architecture.worker.managers.SerializationManager -import org.apache.amber.engine.architecture.worker.statistics.WorkerState.{ - COMPLETED, - READY, - RUNNING -} +import org.apache.amber.engine.architecture.worker.statistics.WorkerState.{COMPLETED, READY, RUNNING} import org.apache.amber.engine.architecture.worker.statistics.WorkerStatistics import org.apache.amber.engine.common.ambermessage._ import org.apache.amber.engine.common.statetransition.WorkerStateManager @@ -157,7 +139,7 @@ class DataProcessor( if (outputTuple == null) return outputTuple match { case FinalizeExecutor() => - sendECMToDataChannels(METHOD_END_CHANNEL, PORT_ALIGNMENT) + sendECMToDataChannels(METHOD_END_CHANNEL.getBareMethodName, PORT_ALIGNMENT) // Send Completed signal to worker actor. executor.close() adaptiveBatchingMonitor.stopAdaptiveBatching() @@ -179,6 +161,13 @@ class DataProcessor( PortCompletedRequest(portId, input), asyncRPCClient.mkContext(CONTROLLER) ) + case FinalizeIteration(worker: ActorVirtualIdentity) => + sendECMToDataChannels( + METHOD_END_ITERATION.getBareMethodName, + PORT_ALIGNMENT, + EndIterationRequest(worker) + ) + executor.reset() case schemaEnforceable: SchemaEnforceable => val portIdentity = outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity) val tuple = schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema) @@ -264,23 +253,41 @@ class DataProcessor( } } + def processOnFinish(): Unit = { + val portId = inputGateway.getChannel(inputManager.currentChannelId).getPortId + try { + val outputState = executor.produceStateOnFinish(portId.id) + if (outputState.isDefined) { + outputManager.emitState(outputState.get) + } + outputManager.outputIterator.setTupleOutput( + executor.onFinishMultiPort(portId.id) + ) + } catch safely { + case e => + // forward input tuple to the user and pause DP thread + handleExecutorException(e) + } + } + def sendECMToDataChannels( - method: MethodDescriptor[EmptyRequest, EmptyReturn], - alignment: EmbeddedControlMessageType + method: String, + alignment: EmbeddedControlMessageType, + request: ControlRequest = EmptyRequest() ): Unit = { outputManager.flush() outputGateway.getActiveChannels .filter(!_.isControl) .foreach { activeChannelId => asyncRPCClient.sendECMToChannel( - EmbeddedControlMessageIdentity(method.getBareMethodName), + EmbeddedControlMessageIdentity(method), alignment, Set(), Map( activeChannelId.toWorkerId.name -> ControlInvocation( - method.getBareMethodName, - EmptyRequest(), + method, + request, AsyncRPCContext(ActorVirtualIdentity(""), ActorVirtualIdentity("")), -1 ) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala index 11fcc77add2..dfb28e838b8 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala @@ -47,6 +47,8 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) with EndHandler with StartChannelHandler with EndChannelHandler + with EndIterationHandler + with NextIterationHandler with AssignPortHandler with AddInputChannelHandler with FlushNetworkBufferHandler diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala index 4bb37e2ff28..fc90fc174a8 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndChannelHandler.scala @@ -25,6 +25,7 @@ import org.apache.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import org.apache.amber.error.ErrorUtils.safely +import org.apache.amber.operator.loop.LoopStartOpExec trait EndChannelHandler { this: DataProcessorRPCHandlerInitializer => @@ -37,19 +38,7 @@ trait EndChannelHandler { val portId = dp.inputGateway.getChannel(channelId).getPortId dp.inputManager.getPort(portId).completed = true dp.inputManager.initBatch(channelId, Array.empty) - try { - val outputState = dp.executor.produceStateOnFinish(portId.id) - if (outputState.isDefined) { - dp.outputManager.emitState(outputState.get) - } - dp.outputManager.outputIterator.setTupleOutput( - dp.executor.onFinishMultiPort(portId.id) - ) - } catch safely { - case e => - // forward input tuple to the user and pause DP thread - dp.handleExecutorException(e) - } + dp.processOnFinish() dp.outputManager.outputIterator.appendSpecialTupleToEnd( FinalizePort(portId, input = true) @@ -60,7 +49,12 @@ trait EndChannelHandler { // See documentation of isMissingOutputPort if (!dp.outputManager.isMissingOutputPort) { // assuming all the output ports finalize after all input ports are finalized. - dp.outputManager.finalizeOutput() + dp.executor match { + case executor: LoopStartOpExec if executor.checkCondition() => + dp.outputManager.finalizeIteration(dp.actorId) + case _ => + dp.outputManager.finalizeOutput() + } } } EmptyReturn() diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala new file mode 100644 index 00000000000..bc6141222e8 --- /dev/null +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.engine.architecture.worker.promisehandlers + +import org.apache.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest, EndIterationRequest} +import org.apache.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import com.twitter.util.Future +import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.amber.operator.loop.LoopEndOpExec + +trait EndIterationHandler { + this: DataProcessorRPCHandlerInitializer => + + override def endIteration( + request: EndIterationRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + dp.executor match { + case _: LoopEndOpExec => + workerInterface.nextIteration(EmptyRequest(), mkContext(request.worker)) + case _ => + dp.processOnFinish() + dp.outputManager.finalizeIteration(request.worker) + } + EmptyReturn() + } +} \ No newline at end of file diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala new file mode 100644 index 00000000000..ba85dfa0768 --- /dev/null +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.engine.architecture.worker.promisehandlers + +import com.twitter.util.Future +import org.apache.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest} +import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import org.apache.amber.operator.loop.LoopStartOpExec + +trait NextIterationHandler { + this: DataProcessorRPCHandlerInitializer => + + override def nextIteration( + request: EmptyRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + dp.processOnFinish() + if (dp.executor.asInstanceOf[LoopStartOpExec].checkCondition()) { + dp.outputManager.finalizeIteration(dp.actorId) + } else { + dp.outputManager.finalizeOutput() + } + EmptyReturn() + } +} From f119a1020232f73c13906daa2519a1cf77e0eacd Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 8 Nov 2025 21:28:54 -0800 Subject: [PATCH 03/15] update --- .../worker/promisehandlers/StartChannelHandler.scala | 2 +- .../scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala | 2 +- .../scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala index 222c0a289d7..b129ea7ca86 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/StartChannelHandler.scala @@ -35,7 +35,7 @@ trait StartChannelHandler { ctx: AsyncRPCContext ): Future[EmptyReturn] = { val portId = dp.inputGateway.getChannel(dp.inputManager.currentChannelId).getPortId - dp.sendECMToDataChannels(METHOD_START_CHANNEL, NO_ALIGNMENT) + dp.sendECMToDataChannels(METHOD_START_CHANNEL.getBareMethodName, NO_ALIGNMENT) try { val outputState = dp.executor.produceStateOnStart(portId.id) if (outputState.isDefined) { diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala index 8ebe91483c3..473c23c538e 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala @@ -35,7 +35,7 @@ class LoopEndOpDesc extends LogicalOp { workflowId, executionId, operatorIdentifier, - OpExecWithClassName("edu.uci.ics.amber.operator.loop.LoopEndOpExec") + OpExecWithClassName("org.apache.amber.operator.loop.LoopEndOpExec") ) .withInputPorts(operatorInfo.inputPorts) .withOutputPorts(operatorInfo.outputPorts) diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala index 67d7a503c30..5878b04e292 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala @@ -44,7 +44,7 @@ class LoopStartOpDesc extends LogicalOp { executionId, operatorIdentifier, OpExecWithClassName( - "edu.uci.ics.amber.operator.loop.LoopStartOpExec", + "org.apache.amber.operator.loop.LoopStartOpExec", objectMapper.writeValueAsString(this) ) ) From b76f146b633c17515bbbdccf7e5975b8f9378ed2 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Mon, 10 Nov 2025 13:12:52 -0800 Subject: [PATCH 04/15] fix fmt --- .../architecture/worker/DataProcessor.scala | 33 +++++++++++++++---- .../promisehandlers/EndIterationHandler.scala | 14 +++++--- .../NextIterationHandler.scala | 6 ++-- .../org/apache/amber/operator/LogicalOp.scala | 17 ++++++++-- .../amber/operator/loop/LoopEndOpDesc.scala | 8 ++--- .../amber/operator/loop/LoopEndOpExec.scala | 2 +- .../amber/operator/loop/LoopStartOpDesc.scala | 8 ++--- .../amber/operator/loop/LoopStartOpExec.scala | 2 +- 8 files changed, 63 insertions(+), 27 deletions(-) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessor.scala index c6e4f6273e2..bd43385d3e0 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/DataProcessor.scala @@ -24,18 +24,39 @@ import io.grpc.MethodDescriptor import org.apache.amber.core.executor.OperatorExecutor import org.apache.amber.core.state.State import org.apache.amber.core.tuple._ -import org.apache.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} +import org.apache.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity +} import org.apache.amber.core.workflow.PortIdentity import org.apache.amber.engine.architecture.common.AmberProcessor import org.apache.amber.engine.architecture.logreplay.ReplayLogManager -import org.apache.amber.engine.architecture.messaginglayer.{InputManager, OutputManager, WorkerTimerService} -import org.apache.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{NO_ALIGNMENT, PORT_ALIGNMENT} +import org.apache.amber.engine.architecture.messaginglayer.{ + InputManager, + OutputManager, + WorkerTimerService +} +import org.apache.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{ + NO_ALIGNMENT, + PORT_ALIGNMENT +} import org.apache.amber.engine.architecture.rpc.controlcommands._ import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{METHOD_END_CHANNEL, METHOD_END_ITERATION} -import org.apache.amber.engine.architecture.worker.WorkflowWorker.{DPInputQueueElement, MainThreadDelegateMessage} +import org.apache.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{ + METHOD_END_CHANNEL, + METHOD_END_ITERATION +} +import org.apache.amber.engine.architecture.worker.WorkflowWorker.{ + DPInputQueueElement, + MainThreadDelegateMessage +} import org.apache.amber.engine.architecture.worker.managers.SerializationManager -import org.apache.amber.engine.architecture.worker.statistics.WorkerState.{COMPLETED, READY, RUNNING} +import org.apache.amber.engine.architecture.worker.statistics.WorkerState.{ + COMPLETED, + READY, + RUNNING +} import org.apache.amber.engine.architecture.worker.statistics.WorkerStatistics import org.apache.amber.engine.common.ambermessage._ import org.apache.amber.engine.common.statetransition.WorkerStateManager diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala index bc6141222e8..def9be74bab 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -19,7 +19,11 @@ package org.apache.amber.engine.architecture.worker.promisehandlers -import org.apache.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest, EndIterationRequest} +import org.apache.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + EmptyRequest, + EndIterationRequest +} import org.apache.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import com.twitter.util.Future import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn @@ -29,9 +33,9 @@ trait EndIterationHandler { this: DataProcessorRPCHandlerInitializer => override def endIteration( - request: EndIterationRequest, - ctx: AsyncRPCContext - ): Future[EmptyReturn] = { + request: EndIterationRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { dp.executor match { case _: LoopEndOpExec => workerInterface.nextIteration(EmptyRequest(), mkContext(request.worker)) @@ -41,4 +45,4 @@ trait EndIterationHandler { } EmptyReturn() } -} \ No newline at end of file +} diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala index ba85dfa0768..d4f6bdffbac 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala @@ -29,9 +29,9 @@ trait NextIterationHandler { this: DataProcessorRPCHandlerInitializer => override def nextIteration( - request: EmptyRequest, - ctx: AsyncRPCContext - ): Future[EmptyReturn] = { + request: EmptyRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { dp.processOnFinish() if (dp.executor.asInstanceOf[LoopStartOpExec].checkCondition()) { dp.outputManager.finalizeIteration(dp.actorId) diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala index 05cf952610a..777e0a60889 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala @@ -35,7 +35,12 @@ import org.apache.amber.operator.distinct.DistinctOpDesc import org.apache.amber.operator.dummy.DummyOpDesc import org.apache.amber.operator.filter.SpecializedFilterOpDesc import org.apache.amber.operator.hashJoin.HashJoinOpDesc -import org.apache.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc} +import org.apache.amber.operator.huggingFace.{ + HuggingFaceIrisLogisticRegressionOpDesc, + HuggingFaceSentimentAnalysisOpDesc, + HuggingFaceSpamSMSDetectionOpDesc, + HuggingFaceTextSummarizationOpDesc +} import org.apache.amber.operator.ifStatement.IfOpDesc import org.apache.amber.operator.intersect.IntersectOpDesc import org.apache.amber.operator.intervalJoin.IntervalJoinOpDesc @@ -43,7 +48,10 @@ import org.apache.amber.operator.keywordSearch.KeywordSearchOpDesc import org.apache.amber.operator.limit.LimitOpDesc import org.apache.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc -import org.apache.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc, SklearnAdvancedKNNRegressorTrainerOpDesc} +import org.apache.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{ + SklearnAdvancedKNNClassifierTrainerOpDesc, + SklearnAdvancedKNNRegressorTrainerOpDesc +} import org.apache.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc import org.apache.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc import org.apache.amber.operator.metadata.{OPVersion, OperatorInfo, PropertyNameConstants} @@ -57,7 +65,10 @@ import org.apache.amber.operator.sleep.SleepOpDesc import org.apache.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc} import org.apache.amber.operator.sortPartitions.SortPartitionsOpDesc import org.apache.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc -import org.apache.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc} +import org.apache.amber.operator.source.apis.twitter.v2.{ + TwitterFullArchiveSearchSourceOpDesc, + TwitterSearchSourceOpDesc +} import org.apache.amber.operator.source.fetcher.URLFetcherOpDesc import org.apache.amber.operator.source.scan.FileScanSourceOpDesc import org.apache.amber.operator.source.scan.arrow.ArrowSourceOpDesc diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala index 473c23c538e..50decd90682 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala @@ -27,9 +27,9 @@ import org.apache.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} class LoopEndOpDesc extends LogicalOp { override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = { + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { PhysicalOp .oneToOnePhysicalOp( workflowId, @@ -50,4 +50,4 @@ class LoopEndOpDesc extends LogicalOp { inputPorts = List(InputPort()), outputPorts = List(OutputPort()) ) -} \ No newline at end of file +} diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala index a98081f1814..90a4d7dc754 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala @@ -5,4 +5,4 @@ import org.apache.amber.core.tuple.{Tuple, TupleLike} class LoopEndOpExec extends OperatorExecutor { override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) -} \ No newline at end of file +} diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala index 5878b04e292..f01ce526466 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala @@ -35,9 +35,9 @@ class LoopStartOpDesc extends LogicalOp { var iteration: Int = _ override def getPhysicalOp( - workflowId: WorkflowIdentity, - executionId: ExecutionIdentity - ): PhysicalOp = { + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { PhysicalOp .oneToOnePhysicalOp( workflowId, @@ -63,4 +63,4 @@ class LoopStartOpDesc extends LogicalOp { outputPorts = List(OutputPort()) ) -} \ No newline at end of file +} diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala index cd9698701ed..24ce3c0c0df 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala @@ -44,4 +44,4 @@ class LoopStartOpExec(descString: String) extends OperatorExecutor { data.iterator } -} \ No newline at end of file +} From 9048c40cea48f207d0ad748256e3a2035e68b044 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 21 Jan 2026 18:55:26 -0800 Subject: [PATCH 05/15] fix fmt --- .../promisehandlers/EndIterationHandler.scala | 10 +++++----- .../promisehandlers/NextIterationHandler.scala | 10 +++++----- .../apache/amber/operator/loop/LoopEndOpExec.scala | 8 -------- .../amber/operator/loop/LoopEndOpDesc.scala | 12 ++++++------ .../texera/amber/operator/loop/LoopEndOpExec.scala | 8 ++++++++ .../amber/operator/loop/LoopStartOpDesc.scala | 14 +++++++------- .../amber/operator/loop/LoopStartOpExec.scala | 8 ++++---- 7 files changed, 35 insertions(+), 35 deletions(-) delete mode 100644 common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala rename common/workflow-operator/src/main/scala/org/apache/{ => texera}/amber/operator/loop/LoopEndOpDesc.scala (78%) create mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala rename common/workflow-operator/src/main/scala/org/apache/{ => texera}/amber/operator/loop/LoopStartOpDesc.scala (79%) rename common/workflow-operator/src/main/scala/org/apache/{ => texera}/amber/operator/loop/LoopStartOpExec.scala (86%) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala index def9be74bab..592b3fb5e19 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -17,17 +17,17 @@ * under the License. */ -package org.apache.amber.engine.architecture.worker.promisehandlers +package org.apache.texera.amber.engine.architecture.worker.promisehandlers -import org.apache.amber.engine.architecture.rpc.controlcommands.{ +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, EmptyRequest, EndIterationRequest } -import org.apache.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import com.twitter.util.Future -import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.amber.operator.loop.LoopEndOpExec +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.operator.loop.LoopEndOpExec trait EndIterationHandler { this: DataProcessorRPCHandlerInitializer => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala index d4f6bdffbac..366d9a2ab78 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala @@ -17,13 +17,13 @@ * under the License. */ -package org.apache.amber.engine.architecture.worker.promisehandlers +package org.apache.texera.amber.engine.architecture.worker.promisehandlers import com.twitter.util.Future -import org.apache.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest} -import org.apache.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer -import org.apache.amber.operator.loop.LoopStartOpExec +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import org.apache.texera.amber.operator.loop.LoopStartOpExec trait NextIterationHandler { this: DataProcessorRPCHandlerInitializer => diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala deleted file mode 100644 index 90a4d7dc754..00000000000 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.amber.operator.loop - -import org.apache.amber.core.executor.OperatorExecutor -import org.apache.amber.core.tuple.{Tuple, TupleLike} - -class LoopEndOpExec extends OperatorExecutor { - override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) -} diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala similarity index 78% rename from common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala rename to common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala index 50decd90682..0657c6ce42b 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -17,13 +17,13 @@ * under the License. */ -package org.apache.amber.operator.loop +package org.apache.texera.amber.operator.loop -import org.apache.amber.core.executor.OpExecWithClassName -import org.apache.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} -import org.apache.amber.operator.LogicalOp -import org.apache.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} class LoopEndOpDesc extends LogicalOp { override def getPhysicalOp( diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala new file mode 100644 index 00000000000..60f18cd5fc9 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala @@ -0,0 +1,8 @@ +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} + +class LoopEndOpExec extends OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala similarity index 79% rename from common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala rename to common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala index f01ce526466..fa52f68278b 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala @@ -17,16 +17,16 @@ * under the License. */ -package org.apache.amber.operator.loop +package org.apache.texera.amber.operator.loop import com.fasterxml.jackson.annotation.JsonProperty import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle -import org.apache.amber.core.executor.OpExecWithClassName -import org.apache.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} -import org.apache.amber.operator.LogicalOp -import org.apache.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} -import org.apache.amber.util.JSONUtils.objectMapper +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.util.JSONUtils.objectMapper class LoopStartOpDesc extends LogicalOp { diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala similarity index 86% rename from common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala rename to common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala index 24ce3c0c0df..68c5bcc0e3e 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala @@ -17,11 +17,11 @@ * under the License. */ -package org.apache.amber.operator.loop +package org.apache.texera.amber.operator.loop -import org.apache.amber.core.executor.OperatorExecutor -import org.apache.amber.core.tuple.{Tuple, TupleLike} -import org.apache.amber.util.JSONUtils.objectMapper +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} +import org.apache.texera.amber.util.JSONUtils.objectMapper import scala.collection.mutable From 05ba749c640130b263f7fccca5d9e105f0695574 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 21 Jan 2026 18:55:48 -0800 Subject: [PATCH 06/15] fix fmt --- .../worker/promisehandlers/NextIterationHandler.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala index 366d9a2ab78..ed8cc475d6f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala @@ -20,7 +20,10 @@ package org.apache.texera.amber.engine.architecture.worker.promisehandlers import com.twitter.util.Future -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + EmptyRequest +} import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import org.apache.texera.amber.operator.loop.LoopStartOpExec From 4b2841222306fb069c7d597853ea5aaf602e2113 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 24 Jan 2026 21:57:32 -0800 Subject: [PATCH 07/15] fix fmt --- .../org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala | 2 +- .../org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala index 0657c6ce42b..f56068e9036 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -35,7 +35,7 @@ class LoopEndOpDesc extends LogicalOp { workflowId, executionId, operatorIdentifier, - OpExecWithClassName("org.apache.amber.operator.loop.LoopEndOpExec") + OpExecWithClassName("org.apache.texera.amber.operator.loop.LoopEndOpExec") ) .withInputPorts(operatorInfo.inputPorts) .withOutputPorts(operatorInfo.outputPorts) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala index fa52f68278b..d0c30256f33 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala @@ -44,7 +44,7 @@ class LoopStartOpDesc extends LogicalOp { executionId, operatorIdentifier, OpExecWithClassName( - "org.apache.amber.operator.loop.LoopStartOpExec", + "org.apache.texera.amber.operator.loop.LoopStartOpExec", objectMapper.writeValueAsString(this) ) ) From d6edcfd23c842c003a91c4d887811c8fe11f07bc Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 27 Jan 2026 23:53:36 -0800 Subject: [PATCH 08/15] init --- .../messaginglayer/OutputManager.scala | 27 ++++++++++--------- .../architecture/worker/DataProcessor.scala | 4 ++- ...InputPortMaterializationReaderThread.scala | 10 +++++++ .../OutputPortResultWriterThread.scala | 18 +++++++++---- common/config/src/main/resources/storage.conf | 3 +++ .../texera/amber/config/StorageConfig.scala | 2 ++ .../amber/core/storage/DocumentFactory.scala | 2 ++ .../amber/core/storage/VFSURIFactory.scala | 1 + .../core/storage/result/ResultSchema.scala | 4 +++ 9 files changed, 52 insertions(+), 19 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 9a5ff224399..1fa43cda4a7 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -20,22 +20,16 @@ package org.apache.texera.amber.engine.architecture.messaginglayer import org.apache.texera.amber.core.state.State -import org.apache.texera.amber.core.storage.DocumentFactory +import org.apache.texera.amber.core.storage.{DocumentFactory, VFSResourceType} import org.apache.texera.amber.core.storage.model.BufferedItemWriter +import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.tuple._ import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity} -import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{ - DPOutputIterator, - getBatchSize, - toPartitioner -} +import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{DPOutputIterator, getBatchSize, toPartitioner} import org.apache.texera.amber.engine.architecture.sendsemantics.partitioners._ import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._ -import org.apache.texera.amber.engine.architecture.worker.managers.{ - OutputPortResultWriterThread, - PortStorageWriterTerminateSignal -} +import org.apache.texera.amber.engine.architecture.worker.managers.{OutputPortResultWriterThread, PortStorageWriterTerminateSignal} import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.util.VirtualIdentityUtils @@ -215,7 +209,7 @@ class OutputManager( * @param outputPortId If not specified, the tuple will be written to all output ports that need storage. */ def saveTupleToStorageIfNeeded( - tuple: Tuple, + tuple: Either[Tuple, String], outputPortId: Option[PortIdentity] = None ): Unit = { (outputPortId match { @@ -284,12 +278,19 @@ class OutputManager( } private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { - val bufferedItemWriter = DocumentFactory + val bufferedTupleWriter = DocumentFactory .openDocument(storageUri) ._1 .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) .asInstanceOf[BufferedItemWriter[Tuple]] - val writerThread = new OutputPortResultWriterThread(bufferedItemWriter) + + val ecmUri = storageUri.resolve("ecm") + val bufferedECMWriter = DocumentFactory + .createDocument(ecmUri, ResultSchema.ecmSchema) + .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .asInstanceOf[BufferedItemWriter[Tuple]] + + val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter, bufferedECMWriter) this.outputPortResultWriterThreads(portId) = writerThread writerThread.start() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index dae34468260..1ef9dd7d99e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -188,13 +188,15 @@ class DataProcessor( PORT_ALIGNMENT, EndIterationRequest(worker) ) + executor.reset() case schemaEnforceable: SchemaEnforceable => val portIdentity = outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity) val tuple = schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema) statisticsManager.increaseOutputStatistics(portIdentity, tuple.inMemSize) outputManager.passTupleToDownstream(tuple, outputPortOpt) - outputManager.saveTupleToStorageIfNeeded(tuple, outputPortOpt) + outputManager.saveTupleToStorageIfNeeded(Right(actorId.toString), outputPortOpt) + outputManager.saveTupleToStorageIfNeeded(Left(tuple), outputPortOpt) case other => // skip for now } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala index 10fbbc44a2c..953bc677af1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala @@ -84,6 +84,16 @@ class InputPortMaterializationReaderThread( // Notify the input port of start of input channel emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT) try { + val ecm: VirtualDocument[Tuple] = DocumentFactory + .openDocument(uri.resolve("ecm")) + ._1 + .asInstanceOf[VirtualDocument[Tuple]] + val ecmReadIterator = ecm.get() + if (ecmReadIterator.hasNext) { + val tuple = ecmReadIterator.next() + println("Received ECM tuple: " + tuple) + } + val materialization: VirtualDocument[Tuple] = DocumentFactory .openDocument(uri) ._1 diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala index 28e5d2af667..f8d0710aa88 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala @@ -22,6 +22,7 @@ package org.apache.texera.amber.engine.architecture.worker.managers import com.google.common.collect.Queues import org.apache.texera.amber.core.storage.model.BufferedItemWriter import org.apache.texera.amber.core.tuple.Tuple +import org.apache.texera.amber.core.storage.result.ResultSchema import java.util.concurrent.LinkedBlockingQueue @@ -29,21 +30,28 @@ sealed trait TerminateSignal case object PortStorageWriterTerminateSignal extends TerminateSignal class OutputPortResultWriterThread( - bufferedItemWriter: BufferedItemWriter[Tuple] + bufferedTupleWriter: BufferedItemWriter[Tuple], + bufferedECMWriter: BufferedItemWriter[Tuple] ) extends Thread { - val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] = - Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]() + val queue: LinkedBlockingQueue[Either[Either[Tuple, String], TerminateSignal]] = + Queues.newLinkedBlockingQueue[Either[Either[Tuple, String], TerminateSignal]]() override def run(): Unit = { var internalStop = false while (!internalStop) { val queueContent = queue.take() queueContent match { - case Left(tuple) => bufferedItemWriter.putOne(tuple) + case Left(item) => item match { + case Left(tuple) => bufferedTupleWriter.putOne(tuple) + case Right(ecm) => + val ecmTuple = new Tuple(ResultSchema.ecmSchema, Array(ecm)) + bufferedECMWriter.putOne(ecmTuple) + } case Right(_) => internalStop = true } } - bufferedItemWriter.close() + bufferedTupleWriter.close() + bufferedECMWriter.close() } } diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 85a62b77a3b..a158eb87530 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -52,6 +52,9 @@ storage { runtime-statistics-namespace = "workflow-runtime-statistics" runtime-statistics-namespace = ${?STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE} + ecm-namespace = "ecm" + ecm-namespace = ${?STORAGE_ICEBERG_TABLE_ECM_NAMESPACE} + commit { batch-size = 4096 # decide the buffer size of our IcebergTableWriter batch-size = ${?STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE} diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index c5bd3302862..8a1aba73b76 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -63,6 +63,7 @@ object StorageConfig { conf.getInt("storage.iceberg.table.commit.retry.min-wait-ms") val icebergTableCommitMaxRetryWaitMs: Int = conf.getInt("storage.iceberg.table.commit.retry.max-wait-ms") + val icebergTableECMNamespace: String = conf.getString("storage.iceberg.table.ecm-namespace") // LakeFS specifics // lakefsEndpoint is a var because in test we need to override it to point to the test container @@ -116,6 +117,7 @@ object StorageConfig { val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = "STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES" val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = "STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS" val ENV_ICEBERG_TABLE_COMMIT_MAX_WAIT_MS = "STORAGE_ICEBERG_TABLE_COMMIT_MAX_WAIT_MS" + val ENV_ICEBERG_TABLE_ECM_NAMESPACE = "STORAGE_ICEBERG_TABLE_ECM_NAMESPACE" // LakeFS val ENV_LAKEFS_ENDPOINT = "STORAGE_LAKEFS_ENDPOINT" diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala index 4c37c33bb20..5b27170560f 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala @@ -72,6 +72,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case ECM => StorageConfig.icebergTableECMNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } @@ -126,6 +127,7 @@ object DocumentFactory { case RESULT => StorageConfig.icebergTableResultNamespace case CONSOLE_MESSAGES => StorageConfig.icebergTableConsoleMessagesNamespace case RUNTIME_STATISTICS => StorageConfig.icebergTableRuntimeStatisticsNamespace + case ECM => StorageConfig.icebergTableECMNamespace case _ => throw new IllegalArgumentException(s"Resource type $resourceType is not supported") } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala index 3513ac5ecd8..f4f3a9e4bf1 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala @@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration { val RESULT: Value = Value("result") val RUNTIME_STATISTICS: Value = Value("runtimeStatistics") val CONSOLE_MESSAGES: Value = Value("consoleMessages") + val ECM: Value = Value("ecm") } object VFSURIFactory { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala index ade33283f7f..a3978d0c714 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala @@ -39,4 +39,8 @@ object ResultSchema { val consoleMessagesSchema: Schema = new Schema( new Attribute("message", AttributeType.STRING) ) + + val ecmSchema: Schema = new Schema( + new Attribute("workerId", AttributeType.STRING) + ) } From 4b48e8c70ad8809e64bf34334c3f55cd5cc20e46 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 28 Jan 2026 18:51:12 -0800 Subject: [PATCH 09/15] update --- .../architecture/messaginglayer/OutputManager.scala | 5 ++++- .../engine/architecture/worker/DataProcessor.scala | 10 +++++----- .../org/apache/texera/amber/core/tuple/TupleLike.scala | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 1fa43cda4a7..0bc826c42df 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -255,7 +255,10 @@ class OutputManager( } def finalizeIteration(worker: ActorVirtualIdentity): Unit = { - outputIterator.appendSpecialTupleToEnd(FinalizeIteration(worker)) + this.ports.keys + .foreach(outputPortId => + outputIterator.appendSpecialTupleToEnd(FinalizeIteration(outputPortId, worker)) + ) } /** diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 1ef9dd7d99e..86c4c35b751 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -174,7 +174,7 @@ class DataProcessor( EmptyRequest(), asyncRPCClient.mkContext(CONTROLLER) ) - case FinalizePort(portId, input) => + case FinalizePort(portId: PortIdentity, input: Boolean) => if (!input) { outputManager.closeOutputStorageWriterIfNeeded(portId) } @@ -182,20 +182,20 @@ class DataProcessor( PortCompletedRequest(portId, input), asyncRPCClient.mkContext(CONTROLLER) ) - case FinalizeIteration(worker: ActorVirtualIdentity) => + case FinalizeIteration(portId: PortIdentity, worker: ActorVirtualIdentity) => sendECMToDataChannels( METHOD_END_ITERATION.getBareMethodName, PORT_ALIGNMENT, EndIterationRequest(worker) ) - + outputManager.saveTupleToStorageIfNeeded(Right(actorId.toString), outputPortOpt) + outputManager.closeOutputStorageWriterIfNeeded(portId) + asyncRPCClient.controllerInterface.portCompleted(PortCompletedRequest(portId, input = false), asyncRPCClient.mkContext(CONTROLLER)) // fix this line, add iteration completed rpc executor.reset() case schemaEnforceable: SchemaEnforceable => val portIdentity = outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity) val tuple = schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema) statisticsManager.increaseOutputStatistics(portIdentity, tuple.inMemSize) - outputManager.passTupleToDownstream(tuple, outputPortOpt) - outputManager.saveTupleToStorageIfNeeded(Right(actorId.toString), outputPortOpt) outputManager.saveTupleToStorageIfNeeded(Left(tuple), outputPortOpt) case other => // skip for now diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/TupleLike.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/TupleLike.scala index c166920ad4c..cbdf8591bde 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/TupleLike.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/TupleLike.scala @@ -42,7 +42,7 @@ trait InternalMarker extends TupleLike { final case class FinalizePort(portId: PortIdentity, input: Boolean) extends InternalMarker final case class FinalizeExecutor() extends InternalMarker -final case class FinalizeIteration(worker: ActorVirtualIdentity) extends InternalMarker +final case class FinalizeIteration(portId: PortIdentity, worker: ActorVirtualIdentity) extends InternalMarker trait SeqTupleLike extends TupleLike with SchemaEnforceable { override def inMemSize: Long = ??? From ac507fed939c55dafcbe973951a3428432ffda7c Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Thu, 29 Jan 2026 03:58:20 -0800 Subject: [PATCH 10/15] update --- .../architecture/worker/DataProcessor.scala | 7 ++- ...InputPortMaterializationReaderThread.scala | 50 ++++++++----------- .../promisehandlers/EndIterationHandler.scala | 27 +++++++--- 3 files changed, 48 insertions(+), 36 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 86c4c35b751..bd5a47535d4 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -183,12 +183,13 @@ class DataProcessor( asyncRPCClient.mkContext(CONTROLLER) ) case FinalizeIteration(portId: PortIdentity, worker: ActorVirtualIdentity) => + println(s"FinalizeIteration received at worker $actorId for port $portId") sendECMToDataChannels( METHOD_END_ITERATION.getBareMethodName, PORT_ALIGNMENT, EndIterationRequest(worker) ) - outputManager.saveTupleToStorageIfNeeded(Right(actorId.toString), outputPortOpt) + outputManager.saveTupleToStorageIfNeeded(Right(actorId.name), outputPortOpt) outputManager.closeOutputStorageWriterIfNeeded(portId) asyncRPCClient.controllerInterface.portCompleted(PortCompletedRequest(portId, input = false), asyncRPCClient.mkContext(CONTROLLER)) // fix this line, add iteration completed rpc executor.reset() @@ -246,6 +247,10 @@ class DataProcessor( inputManager.currentChannelId = channelId val command = ecm.commandMapping.get(actorId.name) logger.info(s"receive ECM from $channelId, id = ${ecm.id}, cmd = $command") + asyncRPCClient.controllerInterface.consoleMessageTriggered( + ConsoleMessageTriggeredRequest(mkConsoleMessage(actorId, s"receive ECM from $channelId, id = ${ecm.id}, cmd = $command")), + asyncRPCClient.mkContext(CONTROLLER) + ) if (ecm.ecmType != NO_ALIGNMENT) { pauseManager.pauseInputChannel(ECMPause(ecm.id), List(channelId)) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala index 953bc677af1..255339004a8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala @@ -24,27 +24,14 @@ import org.apache.texera.amber.config.ApplicationConfig import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.Tuple -import org.apache.texera.amber.core.virtualidentity.{ - ActorVirtualIdentity, - ChannelIdentity, - EmbeddedControlMessageIdentity -} +import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.toPartitioner -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{ - NO_ALIGNMENT, - PORT_ALIGNMENT -} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{NO_ALIGNMENT, PORT_ALIGNMENT} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{ - METHOD_END_CHANNEL, - METHOD_START_CHANNEL -} +import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{METHOD_END_CHANNEL, METHOD_END_ITERATION, METHOD_START_CHANNEL} import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.Partitioning -import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{ - DPInputQueueElement, - FIFOMessageElement -} +import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{DPInputQueueElement, FIFOMessageElement} import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} import org.apache.texera.amber.util.VirtualIdentityUtils.getFromActorIdForInputPortStorage @@ -82,17 +69,14 @@ class InputPortMaterializationReaderThread( */ override def run(): Unit = { // Notify the input port of start of input channel - emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT) + emitECM(METHOD_START_CHANNEL.getBareMethodName, NO_ALIGNMENT) try { val ecm: VirtualDocument[Tuple] = DocumentFactory .openDocument(uri.resolve("ecm")) ._1 .asInstanceOf[VirtualDocument[Tuple]] val ecmReadIterator = ecm.get() - if (ecmReadIterator.hasNext) { - val tuple = ecmReadIterator.next() - println("Received ECM tuple: " + tuple) - } + val materialization: VirtualDocument[Tuple] = DocumentFactory .openDocument(uri) @@ -116,7 +100,15 @@ class InputPortMaterializationReaderThread( } // Flush any remaining tuples in the buffer. if (buffer.nonEmpty) flush() - emitECM(METHOD_END_CHANNEL, PORT_ALIGNMENT) + + + if (ecmReadIterator.hasNext) { + val tuple = ecmReadIterator.next() + println("Received ECM tuple: " + tuple.getField("workerId")) + emitECM(METHOD_END_ITERATION.getBareMethodName, NO_ALIGNMENT, EndIterationRequest(ActorVirtualIdentity(tuple.getField("workerId")))) + } else { + emitECM(METHOD_END_CHANNEL.getBareMethodName, PORT_ALIGNMENT) + } isFinished.set(true) } catch { case e: Exception => @@ -128,19 +120,21 @@ class InputPortMaterializationReaderThread( * Puts an ECM into the internal queue. */ private def emitECM( - method: MethodDescriptor[EmptyRequest, EmptyReturn], - alignment: EmbeddedControlMessageType + method: String, + alignment: EmbeddedControlMessageType, + request: ControlRequest = EmptyRequest() + ): Unit = { flush() val ecm = EmbeddedControlMessage( - EmbeddedControlMessageIdentity(method.getBareMethodName), + EmbeddedControlMessageIdentity(method), alignment, Seq(), Map( workerActorId.name -> ControlInvocation( - method.getBareMethodName, - EmptyRequest(), + method, + request, AsyncRPCContext(ActorVirtualIdentity(""), ActorVirtualIdentity("")), -1 ) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala index 592b3fb5e19..d38cf9854f3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -19,15 +19,12 @@ package org.apache.texera.amber.engine.architecture.worker.promisehandlers -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ - AsyncRPCContext, - EmptyRequest, - EndIterationRequest -} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest, EndIterationRequest} import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import com.twitter.util.Future +import org.apache.texera.amber.core.tuple.FinalizePort import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.texera.amber.operator.loop.LoopEndOpExec +import org.apache.texera.amber.operator.loop.{LoopEndOpExec, LoopStartOpExec} trait EndIterationHandler { this: DataProcessorRPCHandlerInitializer => @@ -40,8 +37,24 @@ trait EndIterationHandler { case _: LoopEndOpExec => workerInterface.nextIteration(EmptyRequest(), mkContext(request.worker)) case _ => + val channelId = dp.inputManager.currentChannelId + val portId = dp.inputGateway.getChannel(channelId).getPortId + dp.inputManager.getPort(portId).completed = true + dp.inputManager.initBatch(channelId, Array.empty) dp.processOnFinish() - dp.outputManager.finalizeIteration(request.worker) + + dp.outputManager.outputIterator.appendSpecialTupleToEnd( + FinalizePort(portId, input = true) + ) + + if (dp.inputManager.getAllPorts.forall(portId => dp.inputManager.isPortCompleted(portId))) { + // Need this check for handling input port dependency relationships. + // See documentation of isMissingOutputPort + if (!dp.outputManager.isMissingOutputPort) { + // assuming all the output ports finalize after all input ports are finalized. + dp.outputManager.finalizeIteration(request.worker) + } + } } EmptyReturn() } From 703a442302449126ad157e78ac8654272b2960bd Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Thu, 29 Jan 2026 04:11:44 -0800 Subject: [PATCH 11/15] update --- .../architecture/worker/DataProcessor.scala | 42 ++++++------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index bd5a47535d4..72d9932bcc8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -19,50 +19,32 @@ package org.apache.texera.amber.engine.architecture.worker +import com.google.protobuf.timestamp.Timestamp import com.softwaremill.macwire.wire import io.grpc.MethodDescriptor import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.tuple._ -import org.apache.texera.amber.core.virtualidentity.{ - ActorVirtualIdentity, - ChannelIdentity, - EmbeddedControlMessageIdentity -} +import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} import org.apache.texera.amber.core.workflow.PortIdentity import org.apache.texera.amber.engine.architecture.common.AmberProcessor import org.apache.texera.amber.engine.architecture.logreplay.ReplayLogManager -import org.apache.texera.amber.engine.architecture.messaginglayer.{ - InputManager, - OutputManager, - WorkerTimerService -} -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{ - NO_ALIGNMENT, - PORT_ALIGNMENT -} +import org.apache.texera.amber.engine.architecture.messaginglayer.{InputManager, OutputManager, WorkerTimerService} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.ConsoleMessageType.{ERROR, PRINT} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{NO_ALIGNMENT, PORT_ALIGNMENT} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn -import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{ - METHOD_END_CHANNEL, - METHOD_END_ITERATION -} -import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{ - DPInputQueueElement, - MainThreadDelegateMessage -} +import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.{METHOD_END_CHANNEL, METHOD_END_ITERATION} +import org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.{DPInputQueueElement, MainThreadDelegateMessage} import org.apache.texera.amber.engine.architecture.worker.managers.SerializationManager -import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState.{ - COMPLETED, - READY, - RUNNING -} +import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerState.{COMPLETED, READY, RUNNING} import org.apache.texera.amber.engine.architecture.worker.statistics.WorkerStatistics import org.apache.texera.amber.engine.common.ambermessage._ import org.apache.texera.amber.engine.common.statetransition.WorkerStateManager import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER import org.apache.texera.amber.error.ErrorUtils.{mkConsoleMessage, safely} +import java.time.Instant import java.util.concurrent.LinkedBlockingQueue class DataProcessor( @@ -189,7 +171,7 @@ class DataProcessor( PORT_ALIGNMENT, EndIterationRequest(worker) ) - outputManager.saveTupleToStorageIfNeeded(Right(actorId.name), outputPortOpt) + outputManager.saveTupleToStorageIfNeeded(Right("Iteration number = 0, " + worker.name), outputPortOpt) outputManager.closeOutputStorageWriterIfNeeded(portId) asyncRPCClient.controllerInterface.portCompleted(PortCompletedRequest(portId, input = false), asyncRPCClient.mkContext(CONTROLLER)) // fix this line, add iteration completed rpc executor.reset() @@ -247,8 +229,10 @@ class DataProcessor( inputManager.currentChannelId = channelId val command = ecm.commandMapping.get(actorId.name) logger.info(s"receive ECM from $channelId, id = ${ecm.id}, cmd = $command") + + asyncRPCClient.controllerInterface.consoleMessageTriggered( - ConsoleMessageTriggeredRequest(mkConsoleMessage(actorId, s"receive ECM from $channelId, id = ${ecm.id}, cmd = $command")), + ConsoleMessageTriggeredRequest(ConsoleMessage(actorId.name, Timestamp(Instant.now), PRINT, "", s"received ECM from MATERIALIZATION_READER, id = ${ecm.id}", s"cmd = $command")), asyncRPCClient.mkContext(CONTROLLER) ) if (ecm.ecmType != NO_ALIGNMENT) { From 4fb2a6b463b5352b61178e3dcd58daa266d59ee6 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 4 Feb 2026 08:51:18 -0800 Subject: [PATCH 12/15] init --- .../architecture/rpc/controlcommands.proto | 6 ++ .../architecture/rpc/controllerservice.proto | 1 + .../architecture/rpc/controlreturns.proto | 1 + ...ControllerAsyncRPCHandlerInitializer.scala | 1 + .../IterationCompletedHandler.scala | 78 +++++++++++++++++++ .../messaginglayer/OutputManager.scala | 20 ++--- .../RegionExecutionCoordinator.scala | 14 +++- .../WorkflowExecutionCoordinator.scala | 24 +++++- .../architecture/worker/DataProcessor.scala | 7 +- .../OutputPortResultWriterThread.scala | 15 +--- .../promisehandlers/EndIterationHandler.scala | 1 + .../NextIterationHandler.scala | 12 +-- .../texera/amber/engine/common/Utils.scala | 2 + .../workflow-editor.component.ts | 1 + .../service/joint-ui/joint-ui.service.ts | 3 + .../types/execute-workflow.interface.ts | 1 + 16 files changed, 153 insertions(+), 34 deletions(-) create mode 100644 amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index c2816408d1e..0943e459ee8 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -46,6 +46,7 @@ message ControlRequest { PortCompletedRequest portCompletedRequest = 9; WorkerStateUpdatedRequest workerStateUpdatedRequest = 10; LinkWorkersRequest linkWorkersRequest = 11; + IterationCompletedRequest iterationCompletedRequest = 12; // request for worker AddInputChannelRequest addInputChannelRequest = 50; @@ -169,6 +170,11 @@ message PortCompletedRequest { bool input = 2; } +// Notify controller that an output port has finished one iteration (used by loop operators). +message IterationCompletedRequest { + core.PortIdentity portId = 1 [(scalapb.field).no_box = true]; +} + message WorkerStateUpdatedRequest { worker.WorkerState state = 1 [(scalapb.field).no_box = true]; } diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto index 70d189a3411..35d88dcf0f1 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto @@ -37,6 +37,7 @@ service ControllerService { rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatePythonExpressionResponse); rpc ConsoleMessageTriggered(ConsoleMessageTriggeredRequest) returns (EmptyReturn); rpc PortCompleted(PortCompletedRequest) returns (EmptyReturn); + rpc IterationCompleted(IterationCompletedRequest) returns (EmptyReturn); rpc StartWorkflow(EmptyRequest) returns (StartWorkflowResponse); rpc ResumeWorkflow(EmptyRequest) returns (EmptyReturn); rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto index 43613b5cfdc..98405c91b78 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto @@ -122,6 +122,7 @@ enum WorkflowAggregatedState { PAUSED = 4; RESUMING = 5; COMPLETED = 6; + ITERATION_COMPLETED = 11; FAILED = 7; UNKNOWN = 8; KILLED = 9; diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala index 4d9a36bab43..d0add4789b5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala @@ -40,6 +40,7 @@ class ControllerAsyncRPCHandlerInitializer( with ResumeHandler with StartWorkflowHandler with PortCompletedHandler + with IterationCompletedHandler with ConsoleMessageHandler with RetryWorkflowHandler with EvaluatePythonExpressionHandler diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala new file mode 100644 index 00000000000..2aab1e4be00 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.controller.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.core.WorkflowRuntimeException +import org.apache.texera.amber.core.workflow.GlobalPortIdentity +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerAsyncRPCHandlerInitializer, + FatalError +} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + IterationCompletedRequest, + QueryStatisticsRequest +} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER +import org.apache.texera.amber.util.VirtualIdentityUtils + +/** Notify controller that a worker has completed an iteration on an output port. + * + * This is different from [[PortCompletedHandler]]: a port can have multiple iterations + * (e.g., loop execution) before the whole port is fully completed. + */ +trait IterationCompletedHandler { + this: ControllerAsyncRPCHandlerInitializer => + + override def iterationCompleted( + msg: IterationCompletedRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + controllerInterface + .controllerInitiateQueryStatistics(QueryStatisticsRequest(scala.Seq(ctx.sender)), CONTROLLER) + .map { _ => + val globalPortId = GlobalPortIdentity( + VirtualIdentityUtils.getPhysicalOpId(ctx.sender), + msg.portId + ) + + cp.workflowExecutionCoordinator.getRegionOfPortId(globalPortId) match { + case Some(region) => + // Emit UI-only IterationCompleted phase for this region. + cp.workflowExecutionCoordinator.markRegionIterationCompletedIfNeeded(region) + + // Keep scheduler running + cp.workflowExecutionCoordinator + .coordinateRegionExecutors(cp.actorService) + .onFailure { + case err: WorkflowRuntimeException => + sendToClient(FatalError(err, err.relatedWorkerId)) + case other => + sendToClient(FatalError(other, None)) + } + case None => + } + + EmptyReturn() + } + } +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 0bc826c42df..bb8843ff1cc 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -118,6 +118,10 @@ class OutputManager( : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = mutable.HashMap() + val ECMWriterThreads + : mutable.HashMap[PortIdentity, BufferedItemWriter[Tuple]] = + mutable.HashMap() + /** * Add down stream operator and its corresponding Partitioner. * @@ -209,7 +213,7 @@ class OutputManager( * @param outputPortId If not specified, the tuple will be written to all output ports that need storage. */ def saveTupleToStorageIfNeeded( - tuple: Either[Tuple, String], + tuple: Tuple, outputPortId: Option[PortIdentity] = None ): Unit = { (outputPortId match { @@ -239,7 +243,6 @@ class OutputManager( writerThread.join() case None => } - } def getPort(portId: PortIdentity): WorkerPort = ports(portId) @@ -281,19 +284,18 @@ class OutputManager( } private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { - val bufferedTupleWriter = DocumentFactory - .openDocument(storageUri) - ._1 + this.ECMWriterThreads(portId) = DocumentFactory + .createDocument(storageUri.resolve("ecm"), ResultSchema.ecmSchema) .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) .asInstanceOf[BufferedItemWriter[Tuple]] - val ecmUri = storageUri.resolve("ecm") - val bufferedECMWriter = DocumentFactory - .createDocument(ecmUri, ResultSchema.ecmSchema) + val bufferedTupleWriter = DocumentFactory + .openDocument(storageUri) + ._1 .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) .asInstanceOf[BufferedItemWriter[Tuple]] - val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter, bufferedECMWriter) + val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter) this.outputPortResultWriterThreads(portId) = writerThread writerThread.start() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 7e5b228801f..84b18a48b57 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -103,6 +103,7 @@ class RegionExecutionCoordinator( private case object Unexecuted extends RegionExecutionPhase private case object ExecutingDependeePortsPhase extends RegionExecutionPhase private case object ExecutingNonDependeePortsPhase extends RegionExecutionPhase + private case object IterationCompleted extends RegionExecutionPhase private case object Completed extends RegionExecutionPhase private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new AtomicReference( @@ -190,7 +191,8 @@ class RegionExecutionCoordinator( } } - def isCompleted: Boolean = currentPhaseRef.get == Completed + // Treat IterationCompleted as not completed from scheduler perspective. + def isCompleted: Boolean = currentPhaseRef.get == Completed || currentPhaseRef.get == IterationCompleted /** * This will sync and transition the region execution phase from one to another depending on its current phase: @@ -219,6 +221,9 @@ class RegionExecutionCoordinator( } case ExecutingNonDependeePortsPhase => tryCompleteRegionExecution() + case IterationCompleted => + // IterationCompleted is a UI/observability phase; scheduling doesn't advance on it. + Future.Unit case Completed => // Already completed, no further action needed. Future.Unit @@ -543,6 +548,13 @@ class RegionExecutionCoordinator( } } + /** Emit IterationCompleted region phase to frontend. This does not affect scheduling semantics. */ + def setIterationCompletedPhase(): Unit = { + if (currentPhaseRef.get != Completed) { + setPhase(IterationCompleted) + } + } + private def setPhase(phase: RegionExecutionPhase): Unit = { currentPhaseRef.set(phase) SessionState.getAllSessionStates.foreach { state => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 05585f88d8d..8939c2f4f38 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -27,7 +27,8 @@ import org.apache.texera.amber.engine.architecture.common.{ AkkaActorService } import org.apache.texera.amber.engine.architecture.controller.ControllerConfig -import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution +import org.apache.texera.amber.engine.architecture.controller.execution.{OperatorExecution, RegionExecution, WorkflowExecution} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient import scala.collection.mutable @@ -102,6 +103,27 @@ class WorkflowExecutionCoordinator( .unit } + /** + * Mark the given operator as iteration-completed. + * + * This is a UI/observability state only; it doesn't change the worker-level state machine. + */ + def markOperatorIterationCompleted(operatorExecution: OperatorExecution): Unit = { + // No-op placeholder: operator aggregated state is computed from WorkerState. + // IterationCompleted is currently an observability/UI state driven by region phase events. + () + } + + /** + * If all operators in the region have reached ITERATION_COMPLETED (or COMPLETED), mark the region as + * ITERATION_COMPLETED (region is still considered not completed, so scheduling remains unchanged). + */ + def markRegionIterationCompletedIfNeeded(region: Region): Unit = { + // Best-effort: tell region coordinator to emit an IterationCompleted phase event. + // This doesn't affect scheduling/completion semantics. + regionExecutionCoordinators.get(region.id).foreach(_.setIterationCompletedPhase()) + } + def getRegionOfLink(link: PhysicalLink): Region = { getExecutingRegions.find(region => region.getLinks.contains(link)).get } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 72d9932bcc8..dedfc578027 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -24,6 +24,7 @@ import com.softwaremill.macwire.wire import io.grpc.MethodDescriptor import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.state.State +import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.tuple._ import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} import org.apache.texera.amber.core.workflow.PortIdentity @@ -171,15 +172,15 @@ class DataProcessor( PORT_ALIGNMENT, EndIterationRequest(worker) ) - outputManager.saveTupleToStorageIfNeeded(Right("Iteration number = 0, " + worker.name), outputPortOpt) + outputManager.ECMWriterThreads(portId).putOne(new Tuple(ResultSchema.ecmSchema, Array(worker.name))) outputManager.closeOutputStorageWriterIfNeeded(portId) - asyncRPCClient.controllerInterface.portCompleted(PortCompletedRequest(portId, input = false), asyncRPCClient.mkContext(CONTROLLER)) // fix this line, add iteration completed rpc + asyncRPCClient.controllerInterface.iterationCompleted(IterationCompletedRequest(portId), asyncRPCClient.mkContext(CONTROLLER)) executor.reset() case schemaEnforceable: SchemaEnforceable => val portIdentity = outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity) val tuple = schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema) statisticsManager.increaseOutputStatistics(portIdentity, tuple.inMemSize) - outputManager.saveTupleToStorageIfNeeded(Left(tuple), outputPortOpt) + outputManager.saveTupleToStorageIfNeeded(tuple, outputPortOpt) case other => // skip for now } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala index f8d0710aa88..188a08d28e5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala @@ -30,28 +30,21 @@ sealed trait TerminateSignal case object PortStorageWriterTerminateSignal extends TerminateSignal class OutputPortResultWriterThread( - bufferedTupleWriter: BufferedItemWriter[Tuple], - bufferedECMWriter: BufferedItemWriter[Tuple] + bufferedTupleWriter: BufferedItemWriter[Tuple] ) extends Thread { - val queue: LinkedBlockingQueue[Either[Either[Tuple, String], TerminateSignal]] = - Queues.newLinkedBlockingQueue[Either[Either[Tuple, String], TerminateSignal]]() + val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] = + Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]() override def run(): Unit = { var internalStop = false while (!internalStop) { val queueContent = queue.take() queueContent match { - case Left(item) => item match { - case Left(tuple) => bufferedTupleWriter.putOne(tuple) - case Right(ecm) => - val ecmTuple = new Tuple(ResultSchema.ecmSchema, Array(ecm)) - bufferedECMWriter.putOne(ecmTuple) - } + case Left(tuple) => bufferedTupleWriter.putOne(tuple) case Right(_) => internalStop = true } } bufferedTupleWriter.close() - bufferedECMWriter.close() } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala index d38cf9854f3..fe76fb93bb0 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -23,6 +23,7 @@ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPC import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import com.twitter.util.Future import org.apache.texera.amber.core.tuple.FinalizePort +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.operator.loop.{LoopEndOpExec, LoopStartOpExec} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala index ed8cc475d6f..0bf1691ed5d 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala @@ -20,10 +20,8 @@ package org.apache.texera.amber.engine.architecture.worker.promisehandlers import com.twitter.util.Future -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ - AsyncRPCContext, - EmptyRequest -} +import org.apache.texera.amber.core.tuple.FinalizeIteration +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest} import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import org.apache.texera.amber.operator.loop.LoopStartOpExec @@ -36,11 +34,7 @@ trait NextIterationHandler { ctx: AsyncRPCContext ): Future[EmptyReturn] = { dp.processOnFinish() - if (dp.executor.asInstanceOf[LoopStartOpExec].checkCondition()) { - dp.outputManager.finalizeIteration(dp.actorId) - } else { - dp.outputManager.finalizeOutput() - } + dp.outputManager.finalizeIteration(dp.actorId) EmptyReturn() } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala index dc074c1094d..488b5e02050 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala @@ -98,6 +98,7 @@ object Utils extends LazyLogging { case WorkflowAggregatedState.PAUSING => "Pausing" case WorkflowAggregatedState.PAUSED => "Paused" case WorkflowAggregatedState.RESUMING => "Resuming" + case WorkflowAggregatedState.ITERATION_COMPLETED => "IterationCompleted" case WorkflowAggregatedState.COMPLETED => "Completed" case WorkflowAggregatedState.TERMINATED => "Terminated" case WorkflowAggregatedState.FAILED => "Failed" @@ -117,6 +118,7 @@ object Utils extends LazyLogging { case "pausing" => WorkflowAggregatedState.PAUSING case "paused" => WorkflowAggregatedState.PAUSED case "resuming" => WorkflowAggregatedState.RESUMING + case "iterationcompleted" => WorkflowAggregatedState.ITERATION_COMPLETED case "completed" => WorkflowAggregatedState.COMPLETED case "failed" => WorkflowAggregatedState.FAILED case "killed" => WorkflowAggregatedState.KILLED diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index b23f92caf32..08bc601c4a7 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -384,6 +384,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy const colorMap: Record = { ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)", ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)", + IterationCompleted: "rgba(156,39,176,0.2)", Completed: "rgba(76,175,80,0.2)", }; this.paper.getModelById("region-" + region.id).attr("body/fill", colorMap[region.state]); diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index 77458947cd6..f54068ad636 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -401,6 +401,9 @@ export class JointUIService { case OperatorState.Ready: fillColor = "#a6bd37"; break; + case OperatorState.IterationCompleted: + fillColor = "#9C27B0"; + break; case OperatorState.Completed: fillColor = "green"; break; diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts b/frontend/src/app/workspace/types/execute-workflow.interface.ts index 23ade231998..0033ac39c33 100644 --- a/frontend/src/app/workspace/types/execute-workflow.interface.ts +++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts @@ -74,6 +74,7 @@ export enum OperatorState { Pausing = "Pausing", Paused = "Paused", Resuming = "Resuming", + IterationCompleted = "IterationCompleted", Completed = "Completed", Recovering = "Recovering", } From acbb0b5b402c76bd98be3b580eb9d0a4dc9336c0 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 4 Feb 2026 11:17:41 -0800 Subject: [PATCH 13/15] fix --- .../architecture/worker/DataProcessor.scala | 1 + .../promisehandlers/EndIterationHandler.scala | 32 +++++++++---------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index dedfc578027..adbc97bfe47 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -173,6 +173,7 @@ class DataProcessor( EndIterationRequest(worker) ) outputManager.ECMWriterThreads(portId).putOne(new Tuple(ResultSchema.ecmSchema, Array(worker.name))) + outputManager.ECMWriterThreads(portId).close() outputManager.closeOutputStorageWriterIfNeeded(portId) asyncRPCClient.controllerInterface.iterationCompleted(IterationCompletedRequest(portId), asyncRPCClient.mkContext(CONTROLLER)) executor.reset() diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala index fe76fb93bb0..80869dc17f0 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -38,24 +38,24 @@ trait EndIterationHandler { case _: LoopEndOpExec => workerInterface.nextIteration(EmptyRequest(), mkContext(request.worker)) case _ => - val channelId = dp.inputManager.currentChannelId - val portId = dp.inputGateway.getChannel(channelId).getPortId - dp.inputManager.getPort(portId).completed = true - dp.inputManager.initBatch(channelId, Array.empty) - dp.processOnFinish() + } + val channelId = dp.inputManager.currentChannelId + val portId = dp.inputGateway.getChannel(channelId).getPortId + dp.inputManager.getPort(portId).completed = true + dp.inputManager.initBatch(channelId, Array.empty) + dp.processOnFinish() - dp.outputManager.outputIterator.appendSpecialTupleToEnd( - FinalizePort(portId, input = true) - ) + dp.outputManager.outputIterator.appendSpecialTupleToEnd( + FinalizePort(portId, input = true) + ) - if (dp.inputManager.getAllPorts.forall(portId => dp.inputManager.isPortCompleted(portId))) { - // Need this check for handling input port dependency relationships. - // See documentation of isMissingOutputPort - if (!dp.outputManager.isMissingOutputPort) { - // assuming all the output ports finalize after all input ports are finalized. - dp.outputManager.finalizeIteration(request.worker) - } - } + if (dp.inputManager.getAllPorts.forall(portId => dp.inputManager.isPortCompleted(portId))) { + // Need this check for handling input port dependency relationships. + // See documentation of isMissingOutputPort + if (!dp.outputManager.isMissingOutputPort) { + // assuming all the output ports finalize after all input ports are finalized. + dp.outputManager.finalizeIteration(request.worker) + } } EmptyReturn() } From 9899d08b48a16038bc61b6891f0661e93f7274d7 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 4 Feb 2026 18:15:17 -0800 Subject: [PATCH 14/15] update --- .../amber/engine/architecture/worker/DataProcessor.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index adbc97bfe47..13f0b17de4f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -172,8 +172,9 @@ class DataProcessor( PORT_ALIGNMENT, EndIterationRequest(worker) ) - outputManager.ECMWriterThreads(portId).putOne(new Tuple(ResultSchema.ecmSchema, Array(worker.name))) - outputManager.ECMWriterThreads(portId).close() + val writer = outputManager.ECMWriterThreads(portId) + writer.putOne(new Tuple(ResultSchema.ecmSchema, Array(worker.name))) + writer.close() outputManager.closeOutputStorageWriterIfNeeded(portId) asyncRPCClient.controllerInterface.iterationCompleted(IterationCompletedRequest(portId), asyncRPCClient.mkContext(CONTROLLER)) executor.reset() From f6b64b0fbe9f31eeb93f43a9d72487265b05c9bc Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 4 Feb 2026 19:58:40 -0800 Subject: [PATCH 15/15] update --- .../architecture/controller/WorkflowScheduler.scala | 6 +++++- .../controller/execution/RegionExecution.scala | 2 -- .../controller/execution/WorkflowExecution.scala | 5 ----- .../engine/architecture/scheduling/Schedule.scala | 10 +++++++++- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index 9dcf3ad4bfc..aa51f3f0ccd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -52,6 +52,10 @@ class WorkflowScheduler( this.physicalPlan = updatedPhysicalPlan } - def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() + def getNextRegions: Set[Region] = { + val region : Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext() + println("current Region: " + region) + region + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala index d5939c2e3b1..e905c2b0449 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala @@ -59,8 +59,6 @@ case class RegionExecution(region: Region) { physicalOpId: PhysicalOpIdentity, inheritOperatorExecution: Option[OperatorExecution] = None ): OperatorExecution = { - assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution already exists.") - operatorExecutions.getOrElseUpdate( physicalOpId, inheritOperatorExecution diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index dea9b692a4f..b8b6d68091c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -44,11 +44,6 @@ case class WorkflowExecution() { * @throws AssertionError if the `RegionExecution` has already been initialized. */ def initRegionExecution(region: Region): RegionExecution = { - // ensure the region execution hasn't been initialized already. - assert( - !regionExecutions.contains(region.id), - s"RegionExecution of ${region.id} already initialized." - ) regionExecutions.getOrElseUpdate(region.id, RegionExecution(region)) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 6f34c9ed1e5..47474b8478c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -21,14 +21,22 @@ package org.apache.texera.amber.engine.architecture.scheduling case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) - + private var loopStartLevel = currentLevel def getRegions: List[Region] = levelSets.values.flatten.toList override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel) override def next(): Set[Region] = { val regions = levelSets(currentLevel) + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-")))) loopStartLevel = currentLevel currentLevel += 1 regions } + + def loopNext(): Set[Region] = { + val regions = levelSets(currentLevel) + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))) currentLevel = loopStartLevel + else currentLevel += 1 + regions + } }