Skip to content

Commit

Permalink
Support for schedules (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Nov 4, 2024
1 parent 67741fc commit ecaab5c
Show file tree
Hide file tree
Showing 25 changed files with 2,579 additions and 60 deletions.
4 changes: 4 additions & 0 deletions temporalio/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ AllCops:
Layout/ClassStructure:
Enabled: true

# RBS annotations allowed
Layout/LeadingCommentSpace:
AllowRBSInlineAnnotation: true

# Don't need super for activities
Lint/MissingSuper:
AllowedParentClasses:
Expand Down
63 changes: 63 additions & 0 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
require 'temporalio/client/async_activity_handle'
require 'temporalio/client/connection'
require 'temporalio/client/interceptor'
require 'temporalio/client/schedule'
require 'temporalio/client/schedule_handle'
require 'temporalio/client/workflow_execution'
require 'temporalio/client/workflow_execution_count'
require 'temporalio/client/workflow_handle'
Expand Down Expand Up @@ -358,6 +360,67 @@ def count_workflows(query = nil, rpc_options: nil)
@impl.count_workflows(Interceptor::CountWorkflowsInput.new(query:, rpc_options:))
end

# Create a schedule and return its handle.
#
# @param id [String] Unique identifier of the schedule.
# @param schedule [Schedule] Schedule to create.
# @param trigger_immediately [Boolean] If true, trigger one action immediately when creating the schedule.
# @param backfills [Array<Schedule::Backfill>] Set of time periods to take actions on as if that time passed right
# now.
# @param memo [Hash<String, Object>, nil] Memo for the schedule. Memo for a scheduled workflow is part of the
# schedule action.
# @param search_attributes [SearchAttributes, nil] Search attributes for the schedule. Search attributes for a
# scheduled workflow are part of the scheduled action.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [ScheduleHandle] A handle to the created schedule.
# @raise [Error::ScheduleAlreadyRunningError] If a schedule with this ID is already running.
# @raise [Error::RPCError] RPC error from call.
def create_schedule(
id,
schedule,
trigger_immediately: false,
backfills: [],
memo: nil,
search_attributes: nil,
rpc_options: nil
)
@impl.create_schedule(Interceptor::CreateScheduleInput.new(
id:,
schedule:,
trigger_immediately:,
backfills:,
memo:,
search_attributes:,
rpc_options:
))
end

# Get a schedule handle to an existing schedule for the given ID.
#
# @param id [String] Schedule ID to get a handle to.
# @return [ScheduleHandle] The schedule handle.
def schedule_handle(id)
ScheduleHandle.new(client: self, id:)
end

# List schedules.
#
# Note, this list is eventually consistent. Therefore if a schedule is added or deleted, it may not be available in
# the list immediately.
#
# @param query [String] A Temporal visibility list filter.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [Enumerator<Schedule::List::Description>] Enumerable schedules.
#
# @raise [Error::RPCError] RPC error from call.
#
# @see https://docs.temporal.io/visibility
def list_schedules(query = nil, rpc_options: nil)
@impl.list_schedules(Interceptor::ListSchedulesInput.new(query:, rpc_options:))
end

# Get an async activity handle.
#
# @param task_token_or_id_reference [String, ActivityIDReference] Task token string or activity ID reference.
Expand Down
139 changes: 139 additions & 0 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,79 @@ def intercept_client(next_interceptor)
keyword_init: true
)

# Input for {Outbound.create_schedule}.
CreateScheduleInput = Struct.new(
:id,
:schedule,
:trigger_immediately,
:backfills,
:memo,
:search_attributes,
:rpc_options,
keyword_init: true
)

# Input for {Outbound.list_schedules}.
ListSchedulesInput = Struct.new(
:query,
:rpc_options,
keyword_init: true
)

# Input for {Outbound.backfill_schedule}.
BackfillScheduleInput = Struct.new(
:id,
:backfills,
:rpc_options,
keyword_init: true
)

# Input for {Outbound.delete_schedule}.
DeleteScheduleInput = Struct.new(
:id,
:rpc_options,
keyword_init: true
)

# Input for {Outbound.describe_schedule}.
DescribeScheduleInput = Struct.new(
:id,
:rpc_options,
keyword_init: true
)

# Input for {Outbound.pause_schedule}.
PauseScheduleInput = Struct.new(
:id,
:note,
:rpc_options,
keyword_init: true
)

# Input for {Outbound.trigger_schedule}.
TriggerScheduleInput = Struct.new(
:id,
:overlap,
:rpc_options,
keyword_init: true
)

# Input for {Outbound.unpause_schedule}.
UnpauseScheduleInput = Struct.new(
:id,
:note,
:rpc_options,
keyword_init: true
)

# Input for {Outbound.update_schedule}.
UpdateScheduleInput = Struct.new(
:id,
:updater,
:rpc_options,
keyword_init: true
)

# Input for {Outbound.heartbeat_async_activity}.
HeartbeatAsyncActivityInput = Struct.new(
:task_token_or_id_reference,
Expand Down Expand Up @@ -268,6 +341,72 @@ def terminate_workflow(input)
next_interceptor.terminate_workflow(input)
end

# Called for every {Client.create_schedule} call.
#
# @param input [CreateScheduleInput] Input.
# @return [ScheduleHandle] Schedule handle.
def create_schedule(input)
next_interceptor.create_schedule(input)
end

# Called for every {Client.list_schedules} call.
#
# @param input [ListSchedulesInput] Input.
# @return [Enumerator<Schedule::List::Description>] Enumerable schedules.
def list_schedules(input)
next_interceptor.list_schedules(input)
end

# Called for every {ScheduleHandle.backfill} call.
#
# @param input [BackfillScheduleInput] Input.
def backfill_schedule(input)
next_interceptor.backfill_schedule(input)
end

# Called for every {ScheduleHandle.delete} call.
#
# @param input [DeleteScheduleInput] Input.
def delete_schedule(input)
next_interceptor.delete_schedule(input)
end

# Called for every {ScheduleHandle.describe} call.
#
# @param input [DescribeScheduleInput] Input.
# @return [Schedule::Description] Schedule description.
def describe_schedule(input)
next_interceptor.describe_schedule(input)
end

# Called for every {ScheduleHandle.pause} call.
#
# @param input [PauseScheduleInput] Input.
def pause_schedule(input)
next_interceptor.pause_schedule(input)
end

# Called for every {ScheduleHandle.trigger} call.
#
# @param input [TriggerScheduleInput] Input.
def trigger_schedule(input)
next_interceptor.trigger_schedule(input)
end

# Called for every {ScheduleHandle.unpause} call.
#
# @param input [UnpauseScheduleInput] Input.
def unpause_schedule(input)
next_interceptor.unpause_schedule(input)
end

# Called for every {ScheduleHandle.update} call.
#
# @param input [UpdateScheduleInput] Input.
def update_schedule(input)
next_interceptor.update_schedule(input)
end

# Called for every {AsyncActivityHandle.heartbeat} call.
#
# @param input [HeartbeatAsyncActivityInput] Input.
Expand Down
Loading

0 comments on commit ecaab5c

Please sign in to comment.