|   1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
 | func (jm *ControllerV2) syncCronJob(
  cj *batchv1.CronJob,
  js []*batchv1.Job) (*batchv1.CronJob, *time.Duration, error) {
  cj = cj.DeepCopy()
  now := jm.now()
  childrenJobs := make(map[types.UID]bool)
  for _, j := range js {
    childrenJobs[j.ObjectMeta.UID] = true
    // 检测当前 job 是否正在运行
    found := inActiveList(*cj, j.ObjectMeta.UID)
    // 如果不是,job 也没完成
    if !found && !IsJobFinished(j) {
      // 在此查找 cronjob 对象
      cjCopy, err := jm.cronJobControl.GetCronJob(cj.Namespace, cj.Name)
      if err != nil {
        return nil, nil, err
      }
      // 再检测当前 job 是否正在运行,防止创建完 job 后控制器奔溃没有更新到 status ,或者 cronjob 的 status 不对最新的状态,再次检查防止漏了。
      if inActiveList(*cjCopy, j.ObjectMeta.UID) {
        cj = cjCopy
        continue
      }
      jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
      // 这里也是防止 job 加入到 inActiveList 后,job 完成了但 cronjob 的 status 不是最新的的情况
    } else if found && IsJobFinished(j) {
      _, status := getFinishedStatus(j)
      // 把它从活跃 map 中移除
      deleteFromActiveList(cj, j.ObjectMeta.UID)
      jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
    } else if IsJobFinished(j) {
      // 如果已经完成,更新 cronjob 的完成时间
      if cj.Status.LastSuccessfulTime == nil {
        cj.Status.LastSuccessfulTime = j.Status.CompletionTime
      }
      if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cj.Status.LastSuccessfulTime.Time) {
        cj.Status.LastSuccessfulTime = j.Status.CompletionTime
      }
    }
  }
  // 再次检查 job 是否完成,并将已完成的 job 移出活跃 map 列表
  for _, j := range cj.Status.Active {
    _, found := childrenJobs[j.UID]
    if found {
      continue
    }
    // 直接从 api-server 获取,防止本地缓存没有及时同步而拿不到最新的结果
    _, err := jm.jobControl.GetJob(j.Namespace, j.Name)
    switch {
    case errors.IsNotFound(err):
      // 如果已经不存在了
      jm.recorder.Eventf(cj, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
      deleteFromActiveList(cj, j.UID)
    case err != nil:
      return cj, nil, err
    }
    // the job is missing in the lister but found in api-server
  }
  // 更新 cronjob status
  updatedCJ, err := jm.cronJobControl.UpdateStatus(cj)
  if err != nil {
    klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
    return cj, nil, err
  }
  *cj = *updatedCJ
  if cj.DeletionTimestamp != nil {
    // The CronJob is being deleted.
    // Don't do anything other than updating status.
    return cj, nil, nil
  }
  // 如果是暂停状态则直接返回,不做处理
  if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
    klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    return cj, nil, nil
  }
  // 解析调度时间
  sched, err := cron.ParseStandard(cj.Spec.Schedule)
  if err != nil {
    // this is likely a user error in defining the spec value
    // we should log the error and not reconcile this cronjob until an update to spec
    klog.V(2).InfoS("Unparseable schedule", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", cj.Spec.Schedule, "err", err)
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cj.Spec.Schedule, err)
    return cj, nil, nil
  }
  // 解析时区
  if strings.Contains(cj.Spec.Schedule, "TZ") {
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnsupportedSchedule", "CRON_TZ or TZ used in schedule %q is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", cj.Spec.Schedule)
  }
  // 获取下一次启动 job 的时间
  scheduledTime, err := getNextScheduleTime(*cj, now, sched, jm.recorder)
  if err != nil {
    // this is likely a user error in defining the spec value
    // we should log the error and not reconcile this cronjob until an update to spec
    klog.V(2).InfoS("invalid schedule", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", cj.Spec.Schedule, "err", err)
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cj.Spec.Schedule, err)
    return cj, nil, nil
  }
  // 如果是 nil 
  if scheduledTime == nil {
    // 最早的运行时间比当前时间还早,记录一下日志
    // the scheduled time, that will give atleast 1 unmet time schedule
    klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    // 加了 100ms 用于在下次入列的时候调整时间差
    t := nextScheduledTimeDuration(sched, now)
    return cj, t, nil
  }
  tooLate := false
  if cj.Spec.StartingDeadlineSeconds != nil {
    tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
  }
  // 调度时间 + StartingDeadlineSeconds 比当前时间还要超前,说明已经错过了运行时间,已经太晚了
  if tooLate {
    klog.V(4).InfoS("Missed starting window", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))
    // 加 100ms 调整时间差
    t := nextScheduledTimeDuration(sched, now)
    return cj, t, nil
  }
  if isJobInActiveList(&batchv1.Job{
    ObjectMeta: metav1.ObjectMeta{
      Name:      getJobName(cj, *scheduledTime),
      Namespace: cj.Namespace,
    }}, cj.Status.Active) || cj.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
    klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", scheduledTime)
    t := nextScheduledTimeDuration(sched, now)
    return cj, t, nil
  }
  if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 {
    // Regardless which source of information we use for the set of active jobs,
    // there is some risk that we won't see an active job when there is one.
    // (because we haven't seen the status update to the SJ or the created pod).
    // So it is theoretically possible to have concurrency with Forbid.
    // As long the as the invocations are "far enough apart in time", this usually won't happen.
    //
    // TODO: for Forbid, we could use the same name for every execution, as a lock.
    // With replace, we could use a name that is deterministic per execution time.
    // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
    klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    jm.recorder.Eventf(cj, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
    t := nextScheduledTimeDuration(sched, now)
    return cj, t, nil
  }
  if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
    for _, j := range cj.Status.Active {
      klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))
      job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
      if err != nil {
        jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
        return cj, nil, err
      }
      if !deleteJob(cj, job, jm.jobControl, jm.recorder) {
        return cj, nil, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
      }
    }
  }
  jobReq, err := getJobFromTemplate2(cj, *scheduledTime)
  if err != nil {
    klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    return cj, nil, err
  }
  jobResp, err := jm.jobControl.CreateJob(cj.Namespace, jobReq)
  switch {
  case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
  case errors.IsAlreadyExists(err):
    // If the job is created by other actor, assume  it has updated the cronjob status accordingly
    klog.InfoS("Job already exists", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName()))
    return cj, nil, err
  case err != nil:
    // default error handling
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
    return cj, nil, err
  }
  metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
  klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
  jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
  // ------------------------------------------------------------------ //
  // 更新 cronjob status
  jobRef, err := getRef(jobResp)
  if err != nil {
    klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "err", err)
    return cj, nil, fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cj.GetNamespace(), cj.GetName()))
  }
  cj.Status.Active = append(cj.Status.Active, *jobRef)
  cj.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
  updatedCJ, err = jm.cronJobControl.UpdateStatus(cj)
  if err != nil {
    klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
    return cj, nil, fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err)
  }
  t := nextScheduledTimeDuration(sched, now)
  return updatedCJ, t, nil
}
func getNextScheduleTime(cj batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) {
  var (
    earliestTime time.Time
  )
  if cj.Status.LastScheduleTime != nil {
    earliestTime = cj.Status.LastScheduleTime.Time
  } else {
    // 等于创建时间
    earliestTime = cj.ObjectMeta.CreationTimestamp.Time
  }
  if cj.Spec.StartingDeadlineSeconds != nil {
    schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds))
    // 如果当前时间减去 StartingDeadlineSeconds 在 earliestTime 之后,则上一次启动时间要从 StartingDeadlineSeconds 开始前算起
    if schedulingDeadline.After(earliestTime) {
      earliestTime = schedulingDeadline
    }
  }
  if earliestTime.After(now) {
    return nil, nil
  }
  // 计算错过多少次运行
  t, numberOfMissedSchedules, err := getMostRecentScheduleTime(earliestTime, now, schedule)
  if numberOfMissedSchedules > 100 {
    // 防止过多错过运行的 job 运行,耗完节点资源
    recorder.Eventf(&cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times: %d. Set or decrease .spec.startingDeadlineSeconds or check clock skew", numberOfMissedSchedules)
    klog.InfoS("too many missed times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "missed times", numberOfMissedSchedules)
  }
  return t, err
}
 |