@@ -794,16 +794,13 @@ def schedule(self):
794794 self .schedule_job (job_id , cpu , memory )
795795
796796 def handle_aborts (self ):
797- cluster_name = os .environ ['INFRABOX_CLUSTER_NAME' ]
798-
799797 cursor = self .conn .cursor ()
800798 cursor .execute ('''
801799 SELECT j.id, a.user_id
802800 FROM abort a
803801 JOIN job j
804802 ON a.job_id = j.id
805- AND j.cluster_name = %s
806- ''' , [cluster_name ])
803+ ''' )
807804
808805 aborts = cursor .fetchall ()
809806 cursor .close ()
@@ -871,34 +868,35 @@ def handle_timeouts(self):
871868 cursor = self .conn .cursor ()
872869 cursor .execute ("""
873870 UPDATE job SET state = 'error', end_date = current_timestamp, message = 'Aborted due to timeout'
874- WHERE id = %s""" , (job_id ,))
871+ WHERE id = %s and state = 'running' """ , (job_id ,))
875872 cursor .close ()
876873
877874 def upload_console (self , job_id ):
878875 cursor = self .conn .cursor ()
879- cursor .execute ("""
880- SELECT output FROM console WHERE job_id = %s
881- ORDER BY date
882- """ , [job_id ])
883- lines = cursor .fetchall ()
884- cursor .close ()
876+ cursor .execute ("begin" )
877+ try :
878+ cursor .execute ("""
879+ SELECT output FROM console WHERE job_id = %s
880+ ORDER BY date FOR UPDATE
881+ """ , [job_id ])
882+ lines = cursor .fetchall ()
885883
886- output = ""
887- for l in lines :
888- output += l [0 ]
884+ output = ""
885+ for l in lines :
886+ output += l [0 ]
889887
890- cursor = self .conn .cursor ()
891- cursor .execute ("""
892- UPDATE job SET console = %s WHERE id = %s;
893- DELETE FROM console WHERE job_id = %s;
894- """ , [output , job_id , job_id ])
895- cursor .close ()
888+ if output :
889+ cursor .execute ("""
890+ UPDATE job SET console = %s WHERE id = %s;
891+ DELETE FROM console WHERE job_id = %s;
892+ """ , [output , job_id , job_id ])
893+ cursor .execute ("commit" )
894+ except Exception as e :
895+ self .logger .error (e )
896+ cursor .execute ("rollback" )
897+ finally :
898+ cursor .close ()
896899
897- cursor = self .conn .cursor ()
898- cursor .execute ("""
899- DELETE FROM console WHERE job_id = %s
900- """ , [job_id ])
901- cursor .close ()
902900
903901 def handle_orphaned_jobs (self ):
904902 self .logger .debug ("Handling orphaned jobs" )
0 commit comments