cron.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package cron
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "log/slog"
  7. "sync"
  8. "time"
  9. cf "goflare/internal/cloudflare"
  10. "goflare/internal/database/queries"
  11. "goflare/internal/ipfetcher"
  12. "github.com/robfig/cron/v3"
  13. )
  14. const (
  15. settingKeyCurrentIP = "current_ip"
  16. settingKeyCurrentIPUpdatedAt = "current_ip_updated_at"
  17. settingKeyCurrentIPSource = "current_ip_source"
  18. )
  19. type DDNSUpdater struct {
  20. q *queries.Queries
  21. cron *cron.Cron
  22. entryID cron.EntryID
  23. schedule string
  24. mu sync.Mutex
  25. lastIP string
  26. }
  27. func NewDDNSUpdater(q *queries.Queries) *DDNSUpdater {
  28. return &DDNSUpdater{
  29. q: q,
  30. cron: cron.New(),
  31. }
  32. }
  33. func (u *DDNSUpdater) Start(schedule string) error {
  34. id, err := u.cron.AddFunc(schedule, u.run)
  35. if err != nil {
  36. return fmt.Errorf("invalid cron schedule %q: %w", schedule, err)
  37. }
  38. u.entryID = id
  39. u.schedule = schedule
  40. u.cron.Start()
  41. ctx := context.Background()
  42. cached, err := u.q.GetSetting(ctx, settingKeyCurrentIP)
  43. if err == nil && cached != "" {
  44. u.mu.Lock()
  45. u.lastIP = cached
  46. u.mu.Unlock()
  47. }
  48. if err != nil && err != sql.ErrNoRows {
  49. slog.Warn("failed to load current_ip setting", "error", err)
  50. }
  51. slog.Info("ddns updater started", "schedule", schedule)
  52. return nil
  53. }
  54. func (u *DDNSUpdater) Stop() {
  55. u.cron.Stop()
  56. slog.Info("ddns updater stopped")
  57. }
  58. func (u *DDNSUpdater) Schedule() string {
  59. u.mu.Lock()
  60. defer u.mu.Unlock()
  61. return u.schedule
  62. }
  63. func (u *DDNSUpdater) Reschedule(schedule string) error {
  64. u.mu.Lock()
  65. defer u.mu.Unlock()
  66. id, err := u.cron.AddFunc(schedule, u.run)
  67. if err != nil {
  68. return fmt.Errorf("invalid cron schedule %q: %w", schedule, err)
  69. }
  70. u.cron.Remove(u.entryID)
  71. u.entryID = id
  72. u.schedule = schedule
  73. slog.Info("ddns updater rescheduled", "schedule", schedule)
  74. return nil
  75. }
  76. func (u *DDNSUpdater) run() {
  77. ctx := context.Background()
  78. urls, err := u.q.ListEnabledIpProviderUrls(ctx)
  79. if err != nil {
  80. slog.Error("failed to load IP providers", "error", err)
  81. return
  82. }
  83. var urlsOrNil []string
  84. if len(urls) > 0 {
  85. urlsOrNil = urls
  86. }
  87. res, err := ipfetcher.FetchPublicIPFrom(ctx, urlsOrNil)
  88. if err != nil {
  89. slog.Error("failed to fetch public IP", "error", err)
  90. return
  91. }
  92. ip := res.IP
  93. u.mu.Lock()
  94. lastIP := u.lastIP
  95. u.mu.Unlock()
  96. if ip == lastIP {
  97. slog.Debug("public IP unchanged", "ip", ip)
  98. return
  99. }
  100. slog.Info("public IP changed", "old", lastIP, "new", ip)
  101. records, err := u.q.ListNonStaticRecords(ctx)
  102. if err != nil {
  103. slog.Error("failed to list non-static records", "error", err)
  104. return
  105. }
  106. if len(records) == 0 {
  107. now := time.Now().UTC().Format(time.RFC3339)
  108. if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIP, Value: ip}); err != nil {
  109. slog.Error("failed to persist current_ip", "error", err)
  110. } else if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIPUpdatedAt, Value: now}); err != nil {
  111. slog.Error("failed to persist current_ip_updated_at", "error", err)
  112. } else if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIPSource, Value: res.Source}); err != nil {
  113. slog.Error("failed to persist current_ip_source", "error", err)
  114. }
  115. u.mu.Lock()
  116. u.lastIP = ip
  117. u.mu.Unlock()
  118. return
  119. }
  120. allOK := true
  121. for _, rec := range records {
  122. err := cf.UpdateDNSRecord(ctx, rec.ZoneApiKey, rec.CfZoneID, rec.CfRecordID, cf.DNSRecord{
  123. Type: rec.Type,
  124. Name: rec.Name,
  125. Content: ip,
  126. Proxied: rec.Proxied == 1,
  127. TTL: 1, // auto
  128. })
  129. if err != nil {
  130. slog.Error("failed to update DNS record",
  131. "record", rec.Name,
  132. "zone", rec.CfZoneID,
  133. "error", err,
  134. )
  135. allOK = false
  136. continue
  137. }
  138. err = u.q.UpdateRecordContent(ctx, queries.UpdateRecordContentParams{
  139. ID: rec.ID,
  140. Content: ip,
  141. })
  142. if err != nil {
  143. slog.Error("failed to update local record content",
  144. "record", rec.Name,
  145. "error", err,
  146. )
  147. allOK = false
  148. continue
  149. }
  150. slog.Info("updated DNS record", "record", rec.Name, "ip", ip)
  151. }
  152. if allOK {
  153. now := time.Now().UTC().Format(time.RFC3339)
  154. if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIP, Value: ip}); err != nil {
  155. slog.Error("failed to persist current_ip", "error", err)
  156. } else if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIPUpdatedAt, Value: now}); err != nil {
  157. slog.Error("failed to persist current_ip_updated_at", "error", err)
  158. } else if err := u.q.UpsertSetting(ctx, queries.UpsertSettingParams{Key: settingKeyCurrentIPSource, Value: res.Source}); err != nil {
  159. slog.Error("failed to persist current_ip_source", "error", err)
  160. }
  161. u.mu.Lock()
  162. u.lastIP = ip
  163. u.mu.Unlock()
  164. }
  165. }