qkeeper/registry.go
Aleksandr Berkuta 5806fe84c4 init commit
2026-03-27 20:27:56 +03:00

211 lines
4.8 KiB
Go

package main
import (
"encoding/json"
"log"
"sort"
"sync"
"time"
bolt "go.etcd.io/bbolt"
)
const (
bucketQueues = "queues"
inactivityTTL = 14 * 24 * time.Hour
cleanupInterval = time.Hour
)
// QueueMeta is the shape persisted in bbolt.
type QueueMeta struct {
ID string `json:"id"`
Name string `json:"name"`
CreatedAt int64 `json:"createdAt"` // Unix ms
LastActiveAt int64 `json:"lastActiveAt"` // Unix ms
}
// QueueSummary is the shape returned by GET /queues.
type QueueSummary struct {
ID string `json:"id"`
Name string `json:"name"`
CreatedAt int64 `json:"createdAt"`
Count int `json:"count"` // live, from in-memory hub
}
// QueueInstance is the runtime object for one queue.
type QueueInstance struct {
QueueMeta
hub *Hub
state *QueueState
}
// Registry owns all queue instances.
type Registry struct {
mu sync.RWMutex
queues map[string]*QueueInstance
db *bolt.DB
}
func openRegistry(path string) (*Registry, error) {
db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 2 * time.Second})
if err != nil {
return nil, err
}
if err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(bucketQueues))
return err
}); err != nil {
db.Close()
return nil, err
}
r := &Registry{queues: make(map[string]*QueueInstance), db: db}
if err := r.load(); err != nil {
db.Close()
return nil, err
}
go r.cleanupLoop()
return r, nil
}
// load reads persisted queues from bbolt, skipping expired ones.
func (r *Registry) load() error {
return r.db.View(func(tx *bolt.Tx) error {
return tx.Bucket([]byte(bucketQueues)).ForEach(func(k, v []byte) error {
var m QueueMeta
if err := json.Unmarshal(v, &m); err != nil {
log.Printf("skipping corrupt queue %s: %v", k, err)
return nil
}
if time.Since(time.UnixMilli(m.LastActiveAt)) > inactivityTTL {
return nil // expired; cleanup loop will remove from DB
}
r.queues[m.ID] = r.newInstance(m)
return nil
})
})
}
func (r *Registry) newInstance(m QueueMeta) *QueueInstance {
hub := newHub()
state := &QueueState{Name: m.Name}
inst := &QueueInstance{QueueMeta: m, hub: hub, state: state}
go hub.run(state, r, m.ID)
return inst
}
// Create makes a new named queue, persists it, and returns the instance.
func (r *Registry) Create(name string) (*QueueInstance, error) {
now := msNow()
m := QueueMeta{
ID: randHex(4), // 8 hex chars — short but hard to guess
Name: name,
CreatedAt: now,
LastActiveAt: now,
}
if err := r.persist(m); err != nil {
return nil, err
}
inst := r.newInstance(m)
r.mu.Lock()
r.queues[m.ID] = inst
r.mu.Unlock()
return inst, nil
}
// Get returns a running queue instance by ID, or nil.
func (r *Registry) Get(id string) *QueueInstance {
r.mu.RLock()
defer r.mu.RUnlock()
return r.queues[id]
}
// List returns a snapshot summary of all queues, sorted by creation time.
func (r *Registry) List() []QueueSummary {
r.mu.RLock()
defer r.mu.RUnlock()
out := make([]QueueSummary, 0, len(r.queues))
for _, inst := range r.queues {
out = append(out, QueueSummary{
ID: inst.ID,
Name: inst.Name,
CreatedAt: inst.CreatedAt,
Count: int(inst.hub.activeCount.Load()),
})
}
sort.Slice(out, func(i, j int) bool { return out[i].CreatedAt < out[j].CreatedAt })
return out
}
// Touch updates LastActiveAt for a queue (called on join).
// The DB write is async and best-effort.
func (r *Registry) Touch(id string) {
now := msNow()
r.mu.Lock()
inst, ok := r.queues[id]
if ok {
inst.LastActiveAt = now
}
r.mu.Unlock()
if !ok {
return
}
go func() {
_ = r.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketQueues))
var m QueueMeta
if raw := b.Get([]byte(id)); raw != nil {
_ = json.Unmarshal(raw, &m)
}
m.LastActiveAt = now
raw, _ := json.Marshal(m)
return b.Put([]byte(id), raw)
})
}()
}
func (r *Registry) persist(m QueueMeta) error {
return r.db.Update(func(tx *bolt.Tx) error {
raw, _ := json.Marshal(m)
return tx.Bucket([]byte(bucketQueues)).Put([]byte(m.ID), raw)
})
}
func (r *Registry) drop(id string) {
r.mu.Lock()
inst, ok := r.queues[id]
if ok {
delete(r.queues, id)
}
r.mu.Unlock()
if ok {
inst.hub.shutdown()
}
_ = r.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket([]byte(bucketQueues)).Delete([]byte(id))
})
}
func (r *Registry) cleanupLoop() {
t := time.NewTicker(cleanupInterval)
defer t.Stop()
for range t.C {
r.runCleanup()
}
}
func (r *Registry) runCleanup() {
r.mu.RLock()
var expired []string
for id, inst := range r.queues {
if time.Since(time.UnixMilli(inst.LastActiveAt)) > inactivityTTL {
expired = append(expired, id)
}
}
r.mu.RUnlock()
for _, id := range expired {
log.Printf("removing inactive queue %s", id)
r.drop(id)
}
}