cron.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package cron
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "log/slog"
  7. "sync"
  8. "time"
  9. cf "goflare/internal/cloudflare"
  10. "goflare/internal/database/queries"
  11. "goflare/internal/ipfetcher"
  12. "github.com/robfig/cron/v3"
  13. )
  14. const (
  15. settingKeyCurrentIP = "current_ip"
  16. settingKeyCurrentIPUpdatedAt = "current_ip_updated_at"
  17. settingKeyCurrentIPSource = "current_ip_source"
  18. )
  19. type DDNSUpdater struct {
  20. q *queries.Queries
  21. cron *cron.Cron
  22. entryID cron.EntryID
  23. schedule string
  24. mu sync.Mutex
  25. lastIP string
  26. }
  27. func NewDDNSUpdater(q *queries.Queries) *DDNSUpdater {
  28. return &DDNSUpdater{
  29. q: q,
  30. cron: cron.New(),
  31. }
  32. }
  33. func (u *DDNSUpdater) Start(schedule string) error {
  34. id, err := u.cron.AddFunc(schedule, u.run)
  35. if err != nil {
  36. return fmt.Errorf("invalid cron schedule %q: %w", schedule, err)
  37. }
  38. u.entryID = id
  39. u.schedule = schedule
  40. u.cron.Start()
  41. ctx := context.Background()
  42. cached, err := u.q.GetSetting(ctx, settingKeyCurrentIP)
  43. if err == nil && cached != "" {
  44. u.mu.Lock()
  45. u.lastIP = cached
  46. u.mu.Unlock()
  47. }
  48. if err != nil && err != sql.ErrNoRows {
  49. slog.Warn("failed to load current_ip setting", "error", err)
  50. }
  51. slog.Info("ddns updater started", "schedule", schedule)
  52. return nil
  53. }
  54. func (u *DDNSUpdater) Stop() {
  55. u.cron.Stop()
  56. slog.Info("ddns updater stopped")
  57. }
  58. func (u *DDNSUpdater) Schedule() string {
  59. u.mu.Lock()
  60. defer u.mu.Unlock()
  61. return u.schedule
  62. }
  63. func (u *DDNSUpdater) Reschedule(schedule string) error {
  64. u.mu.Lock()
  65. defer u.mu.Unlock()
  66. id, err := u.cron.AddFunc(schedule, u.run)
  67. if err != nil {
  68. return fmt.Errorf("invalid cron schedule %q: %w", schedule, err)
  69. }
  70. u.cron.Remove(u.entryID)
  71. u.entryID = id
  72. u.schedule = schedule
  73. slog.Info("ddns updater rescheduled", "schedule", schedule)
  74. return nil
  75. }
  76. func (u *DDNSUpdater) run() {
  77. ctx := context.Background()
  78. urls, err := u.q.ListEnabledIpProviderUrls(ctx)
  79. if err != nil {
  80. slog.Error("failed to load IP providers", "error", err)
  81. return
  82. }
  83. var urlsOrNil []string
  84. if len(urls) > 0 {
  85. urlsOrNil = urls
  86. }
  87. res, err := ipfetcher.FetchPublicIPFrom(ctx, urlsOrNil)
  88. if err != nil {
  89. slog.Error("failed to fetch public IP", "error", err)
  90. return
  91. }
  92. ip := res.IP
  93. u.mu.Lock()
  94. lastIP := u.lastIP
  95. u.mu.Unlock()
  96. if ip == lastIP {
  97. slog.Debug("public IP unchanged", "ip", ip)
  98. return
  99. }
  100. slog.Info("public IP changed", "old", lastIP, "new", ip)
  101. records, err := u.q.ListNonStaticRecords(ctx)
  102. if err != nil {
  103. slog.Error("failed to list non-static records", "error", err)
  104. return
  105. }
  106. if len(records) == 0 {
  107. now := time.Now().UTC().Format(time.RFC3339)
  108. if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIP, Value: ip}); err != nil {
  109. slog.Error("failed to persist current_ip", "error", err)
  110. } else if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIPUpdatedAt, Value: now}); err != nil {
  111. slog.Error("failed to persist current_ip_updated_at", "error", err)
  112. } else if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIPSource, Value: res.Source}); err != nil {
  113. slog.Error("failed to persist current_ip_source", "error", err)
  114. }
  115. u.mu.Lock()
  116. u.lastIP = ip
  117. u.mu.Unlock()
  118. return
  119. }
  120. type zoneKey struct {
  121. apiKey string
  122. zoneID string
  123. }
  124. byZone := make(map[zoneKey][]queries.ListNonStaticRecordsRow)
  125. for _, rec := range records {
  126. k := zoneKey{apiKey: rec.ZoneApiKey, zoneID: rec.CfZoneID}
  127. byZone[k] = append(byZone[k], rec)
  128. }
  129. allOK := true
  130. for k, zoneRecs := range byZone {
  131. cfRecords := make([]cf.DNSRecord, len(zoneRecs))
  132. for i, rec := range zoneRecs {
  133. cfRecords[i] = cf.DNSRecord{
  134. ID: rec.CfRecordID,
  135. Type: rec.Type,
  136. Name: rec.Name,
  137. Content: ip,
  138. Proxied: rec.Proxied == 1,
  139. TTL: 1, // auto
  140. }
  141. }
  142. if err := cf.BatchUpdateDNSRecords(ctx, k.apiKey, k.zoneID, cfRecords); err != nil {
  143. slog.Error("failed to batch update DNS records",
  144. "zone", k.zoneID,
  145. "count", len(zoneRecs),
  146. "error", err,
  147. )
  148. allOK = false
  149. continue
  150. }
  151. for _, rec := range zoneRecs {
  152. if err := u.q.UpdateRecordContent(ctx, queries.UpdateRecordContentParams{
  153. ID: rec.ID,
  154. Content: ip,
  155. }); err != nil {
  156. slog.Error("failed to update local record content",
  157. "record", rec.Name,
  158. "error", err,
  159. )
  160. allOK = false
  161. continue
  162. }
  163. slog.Info("updated DNS record", "record", rec.Name, "ip", ip)
  164. }
  165. }
  166. if allOK {
  167. now := time.Now().UTC().Format(time.RFC3339)
  168. if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIP, Value: ip}); err != nil {
  169. slog.Error("failed to persist current_ip", "error", err)
  170. } else if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIPUpdatedAt, Value: now}); err != nil {
  171. slog.Error("failed to persist current_ip_updated_at", "error", err)
  172. } else if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIPSource, Value: res.Source}); err != nil {
  173. slog.Error("failed to persist current_ip_source", "error", err)
  174. }
  175. u.mu.Lock()
  176. u.lastIP = ip
  177. u.mu.Unlock()
  178. }
  179. }