Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion lib/cloud_controller/diego/tasks_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module VCAP::CloudController
module Diego
class TasksSync
BATCH_SIZE = 500
PENDING_TASK_EXPIRATION = 300

class Error < StandardError
end
Expand All @@ -21,10 +22,17 @@ def sync

to_update = []
to_cancel = []
expired_pending = []

batched_cc_tasks do |cc_tasks|
cc_tasks.each do |cc_task|
diego_task = diego_tasks.delete(cc_task.guid)

if cc_task.state == TaskModel::PENDING_STATE
expired_pending << cc_task.guid if cc_task.created_at < Time.now.utc - PENDING_TASK_EXPIRATION
next
end

next unless [TaskModel::RUNNING_STATE, TaskModel::CANCELING_STATE].include? cc_task.state

if diego_task.nil?
Expand All @@ -35,6 +43,8 @@ def sync
end
end

fail_expired_pending_tasks(expired_pending)

update_missing_diego_tasks(to_update)
cancel_cc_tasks(to_cancel)
cancel_missing_cc_tasks(diego_tasks)
Expand Down Expand Up @@ -97,6 +107,16 @@ def cancel_cc_tasks(to_cancel)
end
end

def fail_expired_pending_tasks(expired_pending)
expired_pending.each do |task_guid|
task = TaskModel.where(guid: task_guid, state: TaskModel::PENDING_STATE).first
next unless task

task.update(state: TaskModel::FAILED_STATE, failure_reason: 'Task expired in PENDING state')
logger.info('expired-pending-task', task_guid: task_guid)
end
end

def cancel_missing_cc_tasks(to_cancel_missing)
to_cancel_missing.each_key do |task_guid|
workpool.submit(task_guid) do |guid|
Expand All @@ -111,7 +131,7 @@ def batched_cc_tasks
loop do
tasks = TaskModel.where(
Sequel.lit('tasks.id > ?', last_id)
).order(:id).limit(BATCH_SIZE).select(:id, :guid, :state).all
).order(:id).limit(BATCH_SIZE).select(:id, :guid, :state, :created_at).all

yield tasks
return if tasks.count < BATCH_SIZE
Expand Down
41 changes: 37 additions & 4 deletions spec/unit/lib/cloud_controller/diego/tasks_sync_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,43 @@ def exceptions
let!(:succeeded_task) { TaskModel.make(:succeeded, created_at: 1.minute.ago) }
let(:bbs_tasks) { [] }

it 'does nothing to the task' do
expect { subject.sync }.not_to(change do
[pending_task.reload.state, succeeded_task.reload.state]
end)
it 'does not change the succeeded task' do
expect { subject.sync }.not_to(change { succeeded_task.reload.state })
end

it 'does not change a recently created pending task' do
expect { subject.sync }.not_to(change { pending_task.reload.state })
end

it 'bumps freshness' do
subject.sync
expect(bbs_task_client).to have_received(:bump_freshness).once
end
end

context 'when a pending task has expired' do
let!(:expired_pending_task) do
task = TaskModel.make(:pending)
task.this.update(created_at: 10.minutes.ago)
task.reload
end
let!(:recent_pending_task) { TaskModel.make(:pending) }
let(:bbs_tasks) { [] }

it 'fails the expired pending task' do
subject.sync

expect(expired_pending_task.reload.state).to eq(TaskModel::FAILED_STATE)
expect(expired_pending_task.reload.failure_reason).to eq('Task expired in PENDING state')
end

it 'does not fail the recent pending task' do
expect { subject.sync }.not_to(change { recent_pending_task.reload.state })
end

it 'logs the expired pending task' do
subject.sync
expect(logger).to have_received(:info).with('expired-pending-task', task_guid: expired_pending_task.guid)
end

it 'bumps freshness' do
Expand Down
Loading