Skip to content

Commit 026619b

Browse files
Copilottorosent
andcommitted
Fix getClient() to prevent multiple client instantiations
Co-authored-by: torosent <17064840+torosent@users.noreply.github.com>
1 parent 1ffe4d0 commit 026619b

File tree

3 files changed

+132
-14
lines changed

3 files changed

+132
-14
lines changed

azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/DurableClientContext.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public String getTaskHubName() {
5050
* @return the Durable Task client object associated with the current function invocation.
5151
*/
5252
public DurableTaskClient getClient() {
53+
// Return existing client if already initialized
54+
if (this.client != null) {
55+
return this.client;
56+
}
57+
5358
if (this.rpcBaseUrl == null || this.rpcBaseUrl.length() == 0) {
5459
throw new IllegalStateException("The client context wasn't populated with an RPC base URL!");
5560
}
@@ -83,12 +88,10 @@ public HttpResponseMessage waitForCompletionOrCreateCheckStatusResponse(
8388
HttpRequestMessage<?> request,
8489
String instanceId,
8590
Duration timeout) {
86-
if (this.client == null) {
87-
this.client = getClient();
88-
}
91+
DurableTaskClient client = getClient();
8992
OrchestrationMetadata orchestration;
9093
try {
91-
orchestration = this.client.waitForInstanceCompletion(instanceId, timeout, true);
94+
orchestration = client.waitForInstanceCompletion(instanceId, timeout, true);
9295
return request.createResponseBuilder(HttpStatus.ACCEPTED)
9396
.header("Content-Type", "application/json")
9497
.body(orchestration.getSerializedOutput())
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
fbe5bb20835678099fc51a44993ed9b045dee5a6
1+
026329c53fe6363985655857b9ca848ec7238bd2

internal/durabletask-protobuf/protos/orchestrator_service.proto

Lines changed: 124 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ message TaskFailureDetails {
4141
google.protobuf.StringValue stackTrace = 3;
4242
TaskFailureDetails innerFailure = 4;
4343
bool isNonRetriable = 5;
44+
map<string, google.protobuf.Value> properties = 6;
4445
}
4546

4647
enum OrchestrationStatus {
@@ -95,6 +96,7 @@ message TaskScheduledEvent {
9596
google.protobuf.StringValue version = 2;
9697
google.protobuf.StringValue input = 3;
9798
TraceContext parentTraceContext = 4;
99+
map<string, string> tags = 5;
98100
}
99101

100102
message TaskCompletedEvent {
@@ -113,6 +115,7 @@ message SubOrchestrationInstanceCreatedEvent {
113115
google.protobuf.StringValue version = 3;
114116
google.protobuf.StringValue input = 4;
115117
TraceContext parentTraceContext = 5;
118+
map<string, string> tags = 6;
116119
}
117120

118121
message SubOrchestrationInstanceCompletedEvent {
@@ -192,7 +195,7 @@ message EntityOperationCalledEvent {
192195
}
193196

194197
message EntityLockRequestedEvent {
195-
string criticalSectionId = 1;
198+
string criticalSectionId = 1;
196199
repeated string lockSet = 2;
197200
int32 position = 3;
198201
google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories
@@ -217,7 +220,19 @@ message EntityUnlockSentEvent {
217220
message EntityLockGrantedEvent {
218221
string criticalSectionId = 1;
219222
}
220-
223+
224+
message ExecutionRewoundEvent {
225+
google.protobuf.StringValue reason = 1;
226+
google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise
227+
google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise
228+
TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise
229+
google.protobuf.StringValue name = 5; // used by DTS backend only
230+
google.protobuf.StringValue version = 6; // used by DTS backend only
231+
google.protobuf.StringValue input = 7; // used by DTS backend only
232+
ParentInstanceInfo parentInstance = 8; // used by DTS backend only
233+
map<string, string> tags = 9; // used by DTS backend only
234+
}
235+
221236
message HistoryEvent {
222237
int32 eventId = 1;
223238
google.protobuf.Timestamp timestamp = 2;
@@ -244,25 +259,30 @@ message HistoryEvent {
244259
ExecutionResumedEvent executionResumed = 22;
245260
EntityOperationSignaledEvent entityOperationSignaled = 23;
246261
EntityOperationCalledEvent entityOperationCalled = 24;
247-
EntityOperationCompletedEvent entityOperationCompleted = 25;
248-
EntityOperationFailedEvent entityOperationFailed = 26;
262+
EntityOperationCompletedEvent entityOperationCompleted = 25;
263+
EntityOperationFailedEvent entityOperationFailed = 26;
249264
EntityLockRequestedEvent entityLockRequested = 27;
250265
EntityLockGrantedEvent entityLockGranted = 28;
251266
EntityUnlockSentEvent entityUnlockSent = 29;
267+
ExecutionRewoundEvent executionRewound = 30;
252268
}
253269
}
254270

255271
message ScheduleTaskAction {
256272
string name = 1;
257273
google.protobuf.StringValue version = 2;
258274
google.protobuf.StringValue input = 3;
275+
map<string, string> tags = 4;
276+
TraceContext parentTraceContext = 5;
259277
}
260278

261279
message CreateSubOrchestrationAction {
262280
string instanceId = 1;
263281
string name = 2;
264282
google.protobuf.StringValue version = 3;
265283
google.protobuf.StringValue input = 4;
284+
TraceContext parentTraceContext = 5;
285+
map<string, string> tags = 6;
266286
}
267287

268288
message CreateTimerAction {
@@ -282,6 +302,7 @@ message CompleteOrchestrationAction {
282302
google.protobuf.StringValue newVersion = 4;
283303
repeated HistoryEvent carryoverEvents = 5;
284304
TaskFailureDetails failureDetails = 6;
305+
map<string, string> tags = 7;
285306
}
286307

287308
message TerminateOrchestrationAction {
@@ -312,6 +333,11 @@ message OrchestratorAction {
312333
}
313334
}
314335

336+
message OrchestrationTraceContext {
337+
google.protobuf.StringValue spanID = 1;
338+
google.protobuf.Timestamp spanStartTime = 2;
339+
}
340+
315341
message OrchestratorRequest {
316342
string instanceId = 1;
317343
google.protobuf.StringValue executionId = 2;
@@ -320,6 +346,8 @@ message OrchestratorRequest {
320346
OrchestratorEntityParameters entityParameters = 5;
321347
bool requiresHistoryStreaming = 6;
322348
map<string, google.protobuf.Value> properties = 7;
349+
350+
OrchestrationTraceContext orchestrationTraceContext = 8;
323351
}
324352

325353
message OrchestratorResponse {
@@ -331,6 +359,17 @@ message OrchestratorResponse {
331359
// The number of work item events that were processed by the orchestrator.
332360
// This field is optional. If not set, the service should assume that the orchestrator processed all events.
333361
google.protobuf.Int32Value numEventsProcessed = 5;
362+
OrchestrationTraceContext orchestrationTraceContext = 6;
363+
364+
// Whether or not a history is required to complete the original OrchestratorRequest and none was provided.
365+
bool requiresHistory = 7;
366+
367+
// True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false).
368+
bool isPartial = 8;
369+
370+
// Zero-based position of the current chunk within a chunked completion sequence.
371+
// This field is omitted for non-chunked completions.
372+
google.protobuf.Int32Value chunkIndex = 9;
334373
}
335374

336375
message CreateInstanceRequest {
@@ -343,6 +382,7 @@ message CreateInstanceRequest {
343382
google.protobuf.StringValue executionId = 7;
344383
map<string, string> tags = 8;
345384
TraceContext parentTraceContext = 9;
385+
google.protobuf.Timestamp requestTime = 10;
346386
}
347387

348388
message OrchestrationIdReusePolicy {
@@ -449,12 +489,28 @@ message QueryInstancesResponse {
449489
google.protobuf.StringValue continuationToken = 2;
450490
}
451491

492+
message ListInstanceIdsRequest {
493+
repeated OrchestrationStatus runtimeStatus = 1;
494+
google.protobuf.Timestamp completedTimeFrom = 2;
495+
google.protobuf.Timestamp completedTimeTo = 3;
496+
int32 pageSize = 4;
497+
google.protobuf.StringValue lastInstanceKey = 5;
498+
}
499+
500+
message ListInstanceIdsResponse {
501+
repeated string instanceIds = 1;
502+
google.protobuf.StringValue lastInstanceKey = 2;
503+
}
504+
452505
message PurgeInstancesRequest {
453506
oneof request {
454507
string instanceId = 1;
455508
PurgeInstanceFilter purgeInstanceFilter = 2;
509+
InstanceBatch instanceBatch = 4;
456510
}
457511
bool recursive = 3;
512+
// used in the case when an instanceId is specified to determine if the purge request is for an orchestration (as opposed to an entity)
513+
bool isOrchestration = 5;
458514
}
459515

460516
message PurgeInstanceFilter {
@@ -468,6 +524,15 @@ message PurgeInstancesResponse {
468524
google.protobuf.BoolValue isComplete = 2;
469525
}
470526

527+
message RestartInstanceRequest {
528+
string instanceId = 1;
529+
bool restartWithNewInstanceId = 2;
530+
}
531+
532+
message RestartInstanceResponse {
533+
string instanceId = 1;
534+
}
535+
471536
message CreateTaskHubRequest {
472537
bool recreateIfExists = 1;
473538
}
@@ -490,10 +555,12 @@ message SignalEntityRequest {
490555
google.protobuf.StringValue input = 3;
491556
string requestId = 4;
492557
google.protobuf.Timestamp scheduledTime = 5;
558+
TraceContext parentTraceContext = 6;
559+
google.protobuf.Timestamp requestTime = 7;
493560
}
494561

495562
message SignalEntityResponse {
496-
// no payload
563+
// no payload
497564
}
498565

499566
message GetEntityRequest {
@@ -553,6 +620,7 @@ message EntityBatchRequest {
553620
string instanceId = 1;
554621
google.protobuf.StringValue entityState = 2;
555622
repeated OperationRequest operations = 3;
623+
map<string, google.protobuf.Value> properties = 4;
556624
}
557625

558626
message EntityBatchResult {
@@ -562,6 +630,8 @@ message EntityBatchResult {
562630
TaskFailureDetails failureDetails = 4;
563631
string completionToken = 5;
564632
repeated OperationInfo operationInfos = 6; // used only with DTS
633+
// Whether or not an entity state is required to complete the original EntityBatchRequest and none was provided.
634+
bool requiresState = 7;
565635
}
566636

567637
message EntityRequest {
@@ -575,6 +645,7 @@ message OperationRequest {
575645
string operation = 1;
576646
string requestId = 2;
577647
google.protobuf.StringValue input = 3;
648+
TraceContext traceContext = 4;
578649
}
579650

580651
message OperationResult {
@@ -591,10 +662,14 @@ message OperationInfo {
591662

592663
message OperationResultSuccess {
593664
google.protobuf.StringValue result = 1;
665+
google.protobuf.Timestamp startTimeUtc = 2;
666+
google.protobuf.Timestamp endTimeUtc = 3;
594667
}
595668

596669
message OperationResultFailure {
597670
TaskFailureDetails failureDetails = 1;
671+
google.protobuf.Timestamp startTimeUtc = 2;
672+
google.protobuf.Timestamp endTimeUtc = 3;
598673
}
599674

600675
message OperationAction {
@@ -610,6 +685,8 @@ message SendSignalAction {
610685
string name = 2;
611686
google.protobuf.StringValue input = 3;
612687
google.protobuf.Timestamp scheduledTime = 4;
688+
google.protobuf.Timestamp requestTime = 5;
689+
TraceContext parentTraceContext = 6;
613690
}
614691

615692
message StartNewOrchestrationAction {
@@ -618,6 +695,8 @@ message StartNewOrchestrationAction {
618695
google.protobuf.StringValue version = 3;
619696
google.protobuf.StringValue input = 4;
620697
google.protobuf.Timestamp scheduledTime = 5;
698+
google.protobuf.Timestamp requestTime = 6;
699+
TraceContext parentTraceContext = 7;
621700
}
622701

623702
message AbandonActivityTaskRequest {
@@ -644,6 +723,17 @@ message AbandonEntityTaskResponse {
644723
// Empty.
645724
}
646725

726+
message SkipGracefulOrchestrationTerminationsRequest {
727+
InstanceBatch instanceBatch = 1;
728+
google.protobuf.StringValue reason = 2;
729+
}
730+
731+
message SkipGracefulOrchestrationTerminationsResponse {
732+
// Those instances which could not be terminated because they had locked entities at the time of this termination call,
733+
// are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged)
734+
repeated string unterminatedInstanceIds = 1;
735+
}
736+
647737
service TaskHubSidecarService {
648738
// Sends a hello request to the sidecar service.
649739
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
@@ -657,18 +747,21 @@ service TaskHubSidecarService {
657747
// Rewinds an orchestration instance to last known good state and replays from there.
658748
rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse);
659749

750+
// Restarts an orchestration instance.
751+
rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse);
752+
660753
// Waits for an orchestration instance to reach a running or completion state.
661754
rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse);
662-
755+
663756
// Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.).
664757
rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse);
665758

666759
// Raises an event to a running orchestration instance.
667760
rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse);
668-
761+
669762
// Terminates a running orchestration instance.
670763
rpc TerminateInstance(TerminateRequest) returns (TerminateResponse);
671-
764+
672765
// Suspends a running orchestration instance.
673766
rpc SuspendInstance(SuspendRequest) returns (SuspendResponse);
674767

@@ -678,6 +771,9 @@ service TaskHubSidecarService {
678771
// rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse);
679772

680773
rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse);
774+
775+
rpc ListInstanceIds(ListInstanceIdsRequest) returns (ListInstanceIdsResponse);
776+
681777
rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse);
682778

683779
rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem);
@@ -714,6 +810,10 @@ service TaskHubSidecarService {
714810

715811
// Abandon an entity work item
716812
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);
813+
814+
// "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated".
815+
// Note that a maximum of 500 orchestrations can be terminated at a time using this method.
816+
rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse);
717817
}
718818

719819
message GetWorkItemsRequest {
@@ -732,6 +832,16 @@ enum WorkerCapability {
732832
// When set, the service may return work items without any history events as an optimization.
733833
// It is strongly recommended that all SDKs support this capability.
734834
WORKER_CAPABILITY_HISTORY_STREAMING = 1;
835+
836+
// Indicates that the worker supports scheduled tasks.
837+
// The service may send schedule-triggered orchestration work items,
838+
// and the worker must handle them, including the scheduledTime field.
839+
WORKER_CAPABILITY_SCHEDULED_TASKS = 2;
840+
841+
// Signals that the worker can handle large payloads stored externally (e.g., Blob Storage).
842+
// Work items may contain URI references instead of inline data, and the worker must fetch them.
843+
// This avoids message size limits and reduces network overhead.
844+
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
735845
}
736846

737847
message WorkItem {
@@ -750,7 +860,7 @@ message CompleteTaskResponse {
750860
}
751861

752862
message HealthPing {
753-
// No payload
863+
// No payload
754864
}
755865

756866
message StreamInstanceHistoryRequest {
@@ -764,3 +874,8 @@ message StreamInstanceHistoryRequest {
764874
message HistoryChunk {
765875
repeated HistoryEvent events = 1;
766876
}
877+
878+
message InstanceBatch {
879+
// A maximum of 500 instance IDs can be provided in this list.
880+
repeated string instanceIds = 1;
881+
}

0 commit comments

Comments
 (0)