package main import ( "encoding/json" "log" "sort" "sync" "time" bolt "go.etcd.io/bbolt" ) const ( bucketQueues = "queues" inactivityTTL = 14 * 24 * time.Hour cleanupInterval = time.Hour ) // QueueMeta is the shape persisted in bbolt. type QueueMeta struct { ID string `json:"id"` Name string `json:"name"` CreatedAt int64 `json:"createdAt"` // Unix ms LastActiveAt int64 `json:"lastActiveAt"` // Unix ms } // QueueSummary is the shape returned by GET /queues. type QueueSummary struct { ID string `json:"id"` Name string `json:"name"` CreatedAt int64 `json:"createdAt"` Count int `json:"count"` // live, from in-memory hub } // QueueInstance is the runtime object for one queue. type QueueInstance struct { QueueMeta hub *Hub state *QueueState } // Registry owns all queue instances. type Registry struct { mu sync.RWMutex queues map[string]*QueueInstance db *bolt.DB } func openRegistry(path string) (*Registry, error) { db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 2 * time.Second}) if err != nil { return nil, err } if err := db.Update(func(tx *bolt.Tx) error { _, err := tx.CreateBucketIfNotExists([]byte(bucketQueues)) return err }); err != nil { db.Close() return nil, err } r := &Registry{queues: make(map[string]*QueueInstance), db: db} if err := r.load(); err != nil { db.Close() return nil, err } go r.cleanupLoop() return r, nil } // load reads persisted queues from bbolt, skipping expired ones. func (r *Registry) load() error { return r.db.View(func(tx *bolt.Tx) error { return tx.Bucket([]byte(bucketQueues)).ForEach(func(k, v []byte) error { var m QueueMeta if err := json.Unmarshal(v, &m); err != nil { log.Printf("skipping corrupt queue %s: %v", k, err) return nil } if time.Since(time.UnixMilli(m.LastActiveAt)) > inactivityTTL { return nil // expired; cleanup loop will remove from DB } r.queues[m.ID] = r.newInstance(m) return nil }) }) } func (r *Registry) newInstance(m QueueMeta) *QueueInstance { hub := newHub() state := &QueueState{Name: m.Name} inst := &QueueInstance{QueueMeta: m, hub: hub, state: state} go hub.run(state, r, m.ID) return inst } // Create makes a new named queue, persists it, and returns the instance. func (r *Registry) Create(name string) (*QueueInstance, error) { now := msNow() m := QueueMeta{ ID: randHex(4), // 8 hex chars — short but hard to guess Name: name, CreatedAt: now, LastActiveAt: now, } if err := r.persist(m); err != nil { return nil, err } inst := r.newInstance(m) r.mu.Lock() r.queues[m.ID] = inst r.mu.Unlock() return inst, nil } // Get returns a running queue instance by ID, or nil. func (r *Registry) Get(id string) *QueueInstance { r.mu.RLock() defer r.mu.RUnlock() return r.queues[id] } // List returns a snapshot summary of all queues, sorted by creation time. func (r *Registry) List() []QueueSummary { r.mu.RLock() defer r.mu.RUnlock() out := make([]QueueSummary, 0, len(r.queues)) for _, inst := range r.queues { out = append(out, QueueSummary{ ID: inst.ID, Name: inst.Name, CreatedAt: inst.CreatedAt, Count: int(inst.hub.activeCount.Load()), }) } sort.Slice(out, func(i, j int) bool { return out[i].CreatedAt < out[j].CreatedAt }) return out } // Touch updates LastActiveAt for a queue (called on join). // The DB write is async and best-effort. func (r *Registry) Touch(id string) { now := msNow() r.mu.Lock() inst, ok := r.queues[id] if ok { inst.LastActiveAt = now } r.mu.Unlock() if !ok { return } go func() { _ = r.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(bucketQueues)) var m QueueMeta if raw := b.Get([]byte(id)); raw != nil { _ = json.Unmarshal(raw, &m) } m.LastActiveAt = now raw, _ := json.Marshal(m) return b.Put([]byte(id), raw) }) }() } func (r *Registry) persist(m QueueMeta) error { return r.db.Update(func(tx *bolt.Tx) error { raw, _ := json.Marshal(m) return tx.Bucket([]byte(bucketQueues)).Put([]byte(m.ID), raw) }) } func (r *Registry) drop(id string) { r.mu.Lock() inst, ok := r.queues[id] if ok { delete(r.queues, id) } r.mu.Unlock() if ok { inst.hub.shutdown() } _ = r.db.Update(func(tx *bolt.Tx) error { return tx.Bucket([]byte(bucketQueues)).Delete([]byte(id)) }) } func (r *Registry) cleanupLoop() { t := time.NewTicker(cleanupInterval) defer t.Stop() for range t.C { r.runCleanup() } } func (r *Registry) runCleanup() { r.mu.RLock() var expired []string for id, inst := range r.queues { if time.Since(time.UnixMilli(inst.LastActiveAt)) > inactivityTTL { expired = append(expired, id) } } r.mu.RUnlock() for _, id := range expired { log.Printf("removing inactive queue %s", id) r.drop(id) } }