Created
March 9, 2026 10:17
-
-
Save kmsec-uk/9f81b4d6e449ee6830334f98feb5b05c to your computer and use it in GitHub Desktop.
GitHub events API follower
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| /* | |
| 99% of this was deferred to Gemini (thanks G). | |
| Problem: GHArchive [does not get every page](https://github.com/igrigorik/gharchive.org/issues/310) | |
| so we are missing lots of events. | |
| This is a basic implementation of GitHub Events feed follower that fetches all 3 pages | |
| */ | |
| import ( | |
| "context" | |
| "encoding/json" | |
| "errors" | |
| "fmt" | |
| "io" | |
| "log" | |
| "net/http" | |
| "os" | |
| "os/signal" | |
| "sort" | |
| "strconv" | |
| "sync" | |
| "sync/atomic" | |
| "syscall" | |
| "time" | |
| ) | |
| // Event is a placeholder for your actual parsing struct. | |
| type Event struct { | |
| ID string `json:"id"` | |
| // TODO: Replace or extend with your own struct fields | |
| } | |
| func (e Event) GetIDAsInt() int64 { | |
| id, err := strconv.ParseInt(e.ID, 10, 64) | |
| if err != nil { | |
| return 0 | |
| } | |
| return id | |
| } | |
| const ( | |
| baseURL = "https://api.github.com/events?per_page=100&page=" | |
| sleepDuration = 250 * time.Millisecond | |
| ) | |
| func main() { | |
| // 1. Setup context for graceful shutdown via OS signals | |
| ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) | |
| defer stop() | |
| token := os.Getenv("GITHUB_TOKEN") | |
| if token == "" { | |
| log.Println("Warning: GITHUB_TOKEN environment variable not set. Rate limit is 60/hr.") | |
| } | |
| // Reduced timeout to 5 seconds per requirements | |
| client := &http.Client{Timeout: 5 * time.Second} | |
| // STATE | |
| etags := make(map[int]string) | |
| var lastKnownID int64 = 0 | |
| pages := []int{1, 2, 3} | |
| var billableRequests atomic.Int64 // Lock-free counter for API usage tracking | |
| // 2. Metrics Goroutine: Periodically calculate and print API usage | |
| usageInterval := 10 * time.Second | |
| go func() { | |
| ticker := time.NewTicker(usageInterval) | |
| defer ticker.Stop() | |
| startTime := time.Now() | |
| for { | |
| select { | |
| case <-ctx.Done(): | |
| return | |
| case <-ticker.C: | |
| reqs := billableRequests.Load() | |
| elapsedHours := time.Since(startTime).Hours() | |
| if elapsedHours > 0 { | |
| estPerHour := float64(reqs) / elapsedHours | |
| log.Printf("[API Usage] Billable requests: %d | Estimated rate: %.0f requests/hour", reqs, estPerHour) | |
| } | |
| } | |
| } | |
| }() | |
| log.Println("Initializing ETags for pages 1, 2, and 3...") | |
| for _, p := range pages { | |
| status, etag, _, err := fetchPage(ctx, client, token, p, "", &billableRequests) | |
| if err != nil { | |
| if errors.Is(err, context.Canceled) { | |
| log.Println("Initialization aborted by user.") | |
| return | |
| } | |
| log.Fatalf("Failed initial fetch for page %d: %v", p, err) | |
| } | |
| if status == http.StatusOK { | |
| etags[p] = etag | |
| } | |
| } | |
| log.Println("Initialization complete. Starting polling loop...") | |
| for { | |
| // Check for shutdown before each polling iteration | |
| if ctx.Err() != nil { | |
| log.Println("Graceful shutdown initiated. Exiting polling loop.") | |
| return | |
| } | |
| // Step 2a: probe page 1 with conditional GET | |
| status1, etag1, body1, err := fetchPage(ctx, client, token, 1, etags[1], &billableRequests) | |
| if err != nil { | |
| if errors.Is(err, context.Canceled) { | |
| log.Println("Graceful shutdown initiated during page 1 fetch.") | |
| return | |
| } | |
| log.Printf("Error fetching page 1: %v", err) | |
| if sleepContext(ctx, sleepDuration) { | |
| return | |
| } | |
| continue | |
| } | |
| if status1 == http.StatusNotModified { | |
| if sleepContext(ctx, sleepDuration) { | |
| return | |
| } | |
| continue | |
| } | |
| // Step 2b: page 1 changed → update and fetch 2 & 3 in parallel | |
| if status1 == http.StatusOK { | |
| etags[1] = etag1 | |
| } | |
| var ( | |
| wg sync.WaitGroup | |
| mu sync.Mutex | |
| allBody [][]byte | |
| ) | |
| allBody = append(allBody, body1) | |
| for _, p := range []int{2, 3} { | |
| wg.Go(func() { | |
| status, newEtag, body, err := fetchPage(ctx, client, token, p, etags[p], &billableRequests) | |
| if err != nil { | |
| if !errors.Is(err, context.Canceled) { | |
| log.Printf("Error fetching page %d: %v", p, err) | |
| } | |
| return | |
| } | |
| mu.Lock() | |
| defer mu.Unlock() | |
| if status == http.StatusOK { | |
| etags[p] = newEtag | |
| allBody = append(allBody, body) | |
| } | |
| }) | |
| } | |
| wg.Wait() | |
| // Step 2c: merge, parse, and filter | |
| var mergedEvents []Event | |
| for _, b := range allBody { | |
| mergedEvents = append(mergedEvents, parseEvents(b)...) | |
| } | |
| var newEvents []Event | |
| var maxID int64 = 0 | |
| for _, item := range mergedEvents { | |
| idInt := item.GetIDAsInt() | |
| if idInt > lastKnownID { | |
| newEvents = append(newEvents, item) | |
| if idInt > maxID { | |
| maxID = idInt | |
| } | |
| } | |
| } | |
| // Step 2d: advance watermark and process | |
| if len(newEvents) > 0 { | |
| sort.Slice(newEvents, func(i, j int) bool { | |
| return newEvents[i].GetIDAsInt() < newEvents[j].GetIDAsInt() | |
| }) | |
| log.Printf("Found %d new events. Advancing watermark from %d to %d.", len(newEvents), lastKnownID, maxID) | |
| // TODO: Call your process logic here | |
| // processEvents(newEvents) | |
| lastKnownID = maxID | |
| } | |
| if sleepContext(ctx, sleepDuration) { | |
| return | |
| } | |
| } | |
| } | |
| // fetchPage is a helper to execute the GET request with Context and rate limit handling. | |
| func fetchPage(ctx context.Context, client *http.Client, token string, page int, etag string, billableRequests *atomic.Int64) (int, string, []byte, error) { | |
| url := fmt.Sprintf("%s%d", baseURL, page) | |
| // Refactored to use NewRequestWithContext | |
| req, err := http.NewRequestWithContext(ctx, "GET", url, nil) | |
| if err != nil { | |
| return 0, "", nil, err | |
| } | |
| req.Header.Set("Accept", "application/vnd.github+json") | |
| req.Header.Set("User-Agent", "go (kmsec.uk)") | |
| if token != "" { | |
| req.Header.Set("Authorization", "Bearer "+token) | |
| } | |
| if etag != "" { | |
| req.Header.Set("If-None-Match", etag) | |
| } | |
| resp, err := client.Do(req) | |
| if err != nil { | |
| return 0, "", nil, err | |
| } | |
| defer resp.Body.Close() | |
| // Only count requests that hit our GitHub limit (304 Not Modified doesn't count) | |
| if resp.StatusCode != http.StatusNotModified { | |
| billableRequests.Add(1) | |
| } | |
| // Intercept rate limiting dynamically | |
| if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusTooManyRequests { | |
| if err := handleRateLimit(ctx, resp); err != nil { | |
| return resp.StatusCode, "", nil, err // Returns if context is cancelled during the sleep | |
| } | |
| return resp.StatusCode, "", nil, fmt.Errorf("rate limited on page %d", page) | |
| } | |
| body, err := io.ReadAll(resp.Body) | |
| if err != nil { | |
| return resp.StatusCode, "", nil, err | |
| } | |
| newEtag := resp.Header.Get("ETag") | |
| return resp.StatusCode, newEtag, body, nil | |
| } | |
| // handleRateLimit parses X-RateLimit-Reset and blocks until the window resets or context cancels. | |
| func handleRateLimit(ctx context.Context, resp *http.Response) error { | |
| resetHeader := resp.Header.Get("X-RateLimit-Reset") | |
| if resetHeader == "" { | |
| log.Println("Rate limited but no X-RateLimit-Reset header present. Sleeping for 60s.") | |
| return sleepContextErr(ctx, 60*time.Second) | |
| } | |
| resetUnix, err := strconv.ParseInt(resetHeader, 10, 64) | |
| if err != nil { | |
| log.Printf("Failed to parse reset time '%s'. Sleeping for 60s.", resetHeader) | |
| return sleepContextErr(ctx, 60*time.Second) | |
| } | |
| resetTime := time.Unix(resetUnix, 0) | |
| waitTime := time.Until(resetTime) | |
| if waitTime > 0 { | |
| log.Printf("Rate limit exhausted. Sleeping for %v until reset at %v", waitTime, resetTime.Format(time.RFC1123)) | |
| return sleepContextErr(ctx, waitTime+time.Second) | |
| } | |
| return nil | |
| } | |
| // sleepContext safely sleeps while listening for context cancellations. | |
| // Returns true if the context was cancelled. | |
| func sleepContext(ctx context.Context, d time.Duration) bool { | |
| select { | |
| case <-ctx.Done(): | |
| log.Println("Graceful shutdown initiated during sleep cycle.") | |
| return true | |
| case <-time.After(d): | |
| return false | |
| } | |
| } | |
| // sleepContextErr functions similarly to sleepContext but returns the context error. | |
| func sleepContextErr(ctx context.Context, d time.Duration) error { | |
| select { | |
| case <-ctx.Done(): | |
| return ctx.Err() | |
| case <-time.After(d): | |
| return nil | |
| } | |
| } | |
| func parseEvents(body []byte) []Event { | |
| var events []Event | |
| if err := json.Unmarshal(body, &events); err != nil { | |
| log.Printf("Failed to unmarshal events: %v", err) | |
| return nil | |
| } | |
| return events | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment