new_worker()はBackgrounDRb側のプロセスで実行される。
ぱっと見る限り、スレッドが並列して動くように見える。
各ワーカはスレッドセーフになるように気をつける必要があるのかな?
def new_worker(opts={}) @mutex.synchronize { job_key = opts[:job_key] || gen_key unless self[job_key] if opts[:singleton] == true worker = get_worker_by_class(opts[:class]) return worker if worker end self[job_key] = instantiate_worker(opts[:class]).new(job_key, opts[:args]) self[job_key].start_process @timestamps[job_key][:expire_type] = opts[:expire_type] || :created @timestamps[job_key][:ttl] = opts[:ttl] || :immortal return job_key else raise ::BackgrounDRbDuplicateKeyError end } end
def start_process return if schedule_first_run && schedule_first_run.to_i > Time.now.to_i @thread = Thread.new do Thread.current[:safe_to_kill] = ConditionVariable.new Thread.current[:kill] = false begin do_work(@args) rescue Exception => e @logger.error "#{ e.message } - (#{ e.class })" << "\n" << (e.backtrace or []).join("\n") end end @next_start = @interval.from_now if schedule_repeat end