Skip to content

Instantly share code, notes, and snippets.

@kmsec-uk
Created March 9, 2026 10:17
Show Gist options
  • Select an option

  • Save kmsec-uk/9f81b4d6e449ee6830334f98feb5b05c to your computer and use it in GitHub Desktop.

Select an option

Save kmsec-uk/9f81b4d6e449ee6830334f98feb5b05c to your computer and use it in GitHub Desktop.
GitHub events API follower
package main
/*
99% of this was deferred to Gemini (thanks G).
Problem: GHArchive [does not get every page](https://github.com/igrigorik/gharchive.org/issues/310)
so we are missing lots of events.
This is a basic implementation of GitHub Events feed follower that fetches all 3 pages
*/
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"sort"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
)
// Event is a placeholder for your actual parsing struct.
type Event struct {
ID string `json:"id"`
// TODO: Replace or extend with your own struct fields
}
func (e Event) GetIDAsInt() int64 {
id, err := strconv.ParseInt(e.ID, 10, 64)
if err != nil {
return 0
}
return id
}
const (
baseURL = "https://api.github.com/events?per_page=100&page="
sleepDuration = 250 * time.Millisecond
)
func main() {
// 1. Setup context for graceful shutdown via OS signals
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
token := os.Getenv("GITHUB_TOKEN")
if token == "" {
log.Println("Warning: GITHUB_TOKEN environment variable not set. Rate limit is 60/hr.")
}
// Reduced timeout to 5 seconds per requirements
client := &http.Client{Timeout: 5 * time.Second}
// STATE
etags := make(map[int]string)
var lastKnownID int64 = 0
pages := []int{1, 2, 3}
var billableRequests atomic.Int64 // Lock-free counter for API usage tracking
// 2. Metrics Goroutine: Periodically calculate and print API usage
usageInterval := 10 * time.Second
go func() {
ticker := time.NewTicker(usageInterval)
defer ticker.Stop()
startTime := time.Now()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
reqs := billableRequests.Load()
elapsedHours := time.Since(startTime).Hours()
if elapsedHours > 0 {
estPerHour := float64(reqs) / elapsedHours
log.Printf("[API Usage] Billable requests: %d | Estimated rate: %.0f requests/hour", reqs, estPerHour)
}
}
}
}()
log.Println("Initializing ETags for pages 1, 2, and 3...")
for _, p := range pages {
status, etag, _, err := fetchPage(ctx, client, token, p, "", &billableRequests)
if err != nil {
if errors.Is(err, context.Canceled) {
log.Println("Initialization aborted by user.")
return
}
log.Fatalf("Failed initial fetch for page %d: %v", p, err)
}
if status == http.StatusOK {
etags[p] = etag
}
}
log.Println("Initialization complete. Starting polling loop...")
for {
// Check for shutdown before each polling iteration
if ctx.Err() != nil {
log.Println("Graceful shutdown initiated. Exiting polling loop.")
return
}
// Step 2a: probe page 1 with conditional GET
status1, etag1, body1, err := fetchPage(ctx, client, token, 1, etags[1], &billableRequests)
if err != nil {
if errors.Is(err, context.Canceled) {
log.Println("Graceful shutdown initiated during page 1 fetch.")
return
}
log.Printf("Error fetching page 1: %v", err)
if sleepContext(ctx, sleepDuration) {
return
}
continue
}
if status1 == http.StatusNotModified {
if sleepContext(ctx, sleepDuration) {
return
}
continue
}
// Step 2b: page 1 changed → update and fetch 2 & 3 in parallel
if status1 == http.StatusOK {
etags[1] = etag1
}
var (
wg sync.WaitGroup
mu sync.Mutex
allBody [][]byte
)
allBody = append(allBody, body1)
for _, p := range []int{2, 3} {
wg.Go(func() {
status, newEtag, body, err := fetchPage(ctx, client, token, p, etags[p], &billableRequests)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Printf("Error fetching page %d: %v", p, err)
}
return
}
mu.Lock()
defer mu.Unlock()
if status == http.StatusOK {
etags[p] = newEtag
allBody = append(allBody, body)
}
})
}
wg.Wait()
// Step 2c: merge, parse, and filter
var mergedEvents []Event
for _, b := range allBody {
mergedEvents = append(mergedEvents, parseEvents(b)...)
}
var newEvents []Event
var maxID int64 = 0
for _, item := range mergedEvents {
idInt := item.GetIDAsInt()
if idInt > lastKnownID {
newEvents = append(newEvents, item)
if idInt > maxID {
maxID = idInt
}
}
}
// Step 2d: advance watermark and process
if len(newEvents) > 0 {
sort.Slice(newEvents, func(i, j int) bool {
return newEvents[i].GetIDAsInt() < newEvents[j].GetIDAsInt()
})
log.Printf("Found %d new events. Advancing watermark from %d to %d.", len(newEvents), lastKnownID, maxID)
// TODO: Call your process logic here
// processEvents(newEvents)
lastKnownID = maxID
}
if sleepContext(ctx, sleepDuration) {
return
}
}
}
// fetchPage is a helper to execute the GET request with Context and rate limit handling.
func fetchPage(ctx context.Context, client *http.Client, token string, page int, etag string, billableRequests *atomic.Int64) (int, string, []byte, error) {
url := fmt.Sprintf("%s%d", baseURL, page)
// Refactored to use NewRequestWithContext
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return 0, "", nil, err
}
req.Header.Set("Accept", "application/vnd.github+json")
req.Header.Set("User-Agent", "go (kmsec.uk)")
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
if etag != "" {
req.Header.Set("If-None-Match", etag)
}
resp, err := client.Do(req)
if err != nil {
return 0, "", nil, err
}
defer resp.Body.Close()
// Only count requests that hit our GitHub limit (304 Not Modified doesn't count)
if resp.StatusCode != http.StatusNotModified {
billableRequests.Add(1)
}
// Intercept rate limiting dynamically
if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusTooManyRequests {
if err := handleRateLimit(ctx, resp); err != nil {
return resp.StatusCode, "", nil, err // Returns if context is cancelled during the sleep
}
return resp.StatusCode, "", nil, fmt.Errorf("rate limited on page %d", page)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return resp.StatusCode, "", nil, err
}
newEtag := resp.Header.Get("ETag")
return resp.StatusCode, newEtag, body, nil
}
// handleRateLimit parses X-RateLimit-Reset and blocks until the window resets or context cancels.
func handleRateLimit(ctx context.Context, resp *http.Response) error {
resetHeader := resp.Header.Get("X-RateLimit-Reset")
if resetHeader == "" {
log.Println("Rate limited but no X-RateLimit-Reset header present. Sleeping for 60s.")
return sleepContextErr(ctx, 60*time.Second)
}
resetUnix, err := strconv.ParseInt(resetHeader, 10, 64)
if err != nil {
log.Printf("Failed to parse reset time '%s'. Sleeping for 60s.", resetHeader)
return sleepContextErr(ctx, 60*time.Second)
}
resetTime := time.Unix(resetUnix, 0)
waitTime := time.Until(resetTime)
if waitTime > 0 {
log.Printf("Rate limit exhausted. Sleeping for %v until reset at %v", waitTime, resetTime.Format(time.RFC1123))
return sleepContextErr(ctx, waitTime+time.Second)
}
return nil
}
// sleepContext safely sleeps while listening for context cancellations.
// Returns true if the context was cancelled.
func sleepContext(ctx context.Context, d time.Duration) bool {
select {
case <-ctx.Done():
log.Println("Graceful shutdown initiated during sleep cycle.")
return true
case <-time.After(d):
return false
}
}
// sleepContextErr functions similarly to sleepContext but returns the context error.
func sleepContextErr(ctx context.Context, d time.Duration) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d):
return nil
}
}
func parseEvents(body []byte) []Event {
var events []Event
if err := json.Unmarshal(body, &events); err != nil {
log.Printf("Failed to unmarshal events: %v", err)
return nil
}
return events
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment