Skip to content

Commit

Permalink
also retrofit query cmd.go to be consistent with others
Browse files Browse the repository at this point in the history
  • Loading branch information
pickledish committed Dec 14, 2023
1 parent cf4ab75 commit 52c7cb3
Showing 1 changed file with 68 additions and 54 deletions.
122 changes: 68 additions & 54 deletions cmd/query/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"os/signal"
Expand All @@ -12,12 +13,11 @@ import (

"cbnr/util"

"github.com/spf13/cobra"

"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors"
"github.com/go-chi/jwtauth/v5"
"github.com/spf13/cobra"

"github.com/prometheus/client_golang/prometheus/promhttp"
promHttpMetrics "github.com/slok/go-http-metrics/metrics/prometheus"
Expand All @@ -30,69 +30,73 @@ var QueryCmd = &cobra.Command{
Short: "query - starts an API server which can be queried for data",
Run: func(cmd *cobra.Command, args []string) {

log.Printf("STARTING")
log.Printf("[INFO] Starting up...")

// ------------------------------------------------------------------------

log.Printf("[INFO] Seeding randomness for generating IDs...")

rand.Seed(time.Now().UnixNano())

// ------------------------------------------------------------------------

log.Printf("[INFO] Registering int handlers for graceful shutdown...")

// set up channel to handle graceful shutdown
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

httpPort, err := util.GetEnvConfig("HTTP_LISTENER_PORT")
if err != nil {
log.Fatalf("Unable to get http listener port from environment: %v", err)
}
// ------------------------------------------------------------------------

metricsPort, err := util.GetEnvConfig("METRICS_LISTENER_PORT")
if err != nil {
log.Fatalf("Unable to get metrics listener port from environment: %v", err)
}
log.Printf("[INFO] Getting configs from environment...")

influxUrl, err := util.GetEnvConfig("INFLUX_URL")
if err != nil {
log.Fatalf("Unable to get influx location from environment: %v", err)
configNames := []string{
"HTTP_LISTENER_PORT",
"METRICS_LISTENER_PORT",
"INFLUX_URL",
"INFLUX_DB_NAME",
"CORS_ALLOWED_ORIGIN",
"PERMISSIVE_MODE",
"JWT_SECRET",
}

influxDbName, err := util.GetEnvConfig("INFLUX_DB_NAME")
config, err := util.GetEnvConfigs(configNames)
if err != nil {
log.Fatalf("Unable to get influx DB name from environment: %v", err)
log.Fatalf("[ERROR] Could not parse configs from environment: %v", err)
}

i := InfluxClient{influxUrl, influxDbName}
// ------------------------------------------------------------------------

log.Printf("INFLUX PARAMS PARSED, SET UP STRUCT")

corsOrigin, err := util.GetEnvConfig("CORS_ALLOWED_ORIGIN")
if err != nil {
log.Fatalf("Unable to get CORS allowed origin from environment: %v", err)
}
log.Printf("[INFO] Setting up middlewares...")

corsOptions := cors.Options{
AllowedOrigins: []string{corsOrigin},
AllowedOrigins: []string{config["CORS_ALLOWED_ORIGIN"]},
AllowedMethods: []string{"GET", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"},
AllowCredentials: true,
}

permissiveStr, err := util.GetEnvConfig("PERMISSIVE_MODE")
if err != nil {
log.Fatalf("Unable to get permissive mode status from environment: %v", err)
}

permissive := permissiveStr == "true"

jwtSecretStr, err := util.GetEnvConfig("JWT_SECRET")
if err != nil {
log.Fatalf("Unable to get JWT secret token from environment: %v", err)
}

// as soon as supabase supports RS256 / asymmetric JWT encryption, get this
// out of here and replace with the public key just for validation
// as soon as supabase supports RS256 / asymmetric JWT encryption, get
// this out of here and replace with the public key just for validation
// https://github.com/orgs/supabase/discussions/4059
jwtSecret := jwtauth.New("HS256", []byte(jwtSecretStr), nil)
jwtSecret := jwtauth.New("HS256", []byte(config["JWT_SECRET"]), nil)

promMiddleware := promHttpMiddleware.New(promHttpMiddleware.Config{
Recorder: promHttpMetrics.NewRecorder(promHttpMetrics.Config{}),
})

// ------------------------------------------------------------------------

log.Printf("[INFO] Setting up influx client...")

i := InfluxClient{
InfluxUrl: config["INFLUX_URL"],
InfluxDbName: config["INFLUX_DB_NAME"],
}

// ------------------------------------------------------------------------

log.Printf("[INFO] Registering middlewares and handlers...")

r := chi.NewRouter()

r.Use(middleware.Recoverer)
Expand All @@ -101,52 +105,62 @@ var QueryCmd = &cobra.Command{
r.Use(middleware.Timeout(time.Second))
r.Use(cors.Handler(corsOptions))
r.Use(jwtauth.Verifier(jwtSecret))
r.Use(util.CheckJwtMiddleware(permissive, false))
r.Use(util.CheckHostnameMiddleware(permissive))
r.Use(util.CheckJwtMiddleware((config["PERMISSIVE_MODE"] == "true"), false))
r.Use(util.CheckHostnameMiddleware(config["PERMISSIVE_MODE"] == "true"))
r.Use(promHttpStd.HandlerProvider("", promMiddleware))

r.Get("/query", i.HandleQuery)

log.Printf("MIDDLEWARES SET UP, STARTING SERVERS")
// ------------------------------------------------------------------------

log.Printf("[INFO] Trying to listen %v...", config["HTTP_LISTENER_PORT"])

primary := http.Server{
Addr: fmt.Sprintf(":%v", config["HTTP_LISTENER_PORT"]),
Handler: r,
}

primary := http.Server{Addr: fmt.Sprintf(":%v", httpPort), Handler: r}
metrics := http.Server{Addr: fmt.Sprintf(":%v", metricsPort), Handler: promhttp.Handler()}
metrics := http.Server{
Addr: fmt.Sprintf(":%v", config["METRICS_LISTENER_PORT"]),
Handler: promhttp.Handler(),
}

go func() {
log.Printf("PRIMARY SERVER LISTENING ON %v", httpPort)
err := primary.ListenAndServe()
if err != nil {
log.Fatalf("Error while serving primary handler: %v", err)
log.Fatalf("[ERROR] Primary server could not start: %v", err)
}
}()

go func() {
log.Printf("METRICS SERVER LISTENING ON %v", metricsPort)
err := metrics.ListenAndServe()
if err != nil {
log.Fatalf("Error while serving metrics handler: %v", err)
log.Fatalf("[ERROR] Metrics server could not start: %v", err)
}
}()

// block here until we get some sort of interrupt or kill
// ------------------------------------------------------------------------

log.Printf("[INFO] Listening! Main thread now waiting for interrupt...")

<-done

log.Printf("GOT SIGNAL TO DIE, cleaning up...")
log.Printf("[INFO] Got signal to die, cleaning up...")

ctx := context.Background()
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()

err = primary.Shutdown(ctxTimeout)
if err != nil {
log.Fatalf("Could not cleanly shut down primary server: %v", err)
log.Fatalf("[ERROR] Could not cleanly shut down primary server: %v", err)
}

err = metrics.Shutdown(ctxTimeout)
if err != nil {
log.Fatalf("Could not cleanly shut down metrics server: %v", err)
log.Fatalf("[ERROR] Could not cleanly shut down metrics server: %v", err)
}

log.Printf("ALL DONE, GOODBYE")
log.Printf("[INFO] ALL DONE, GOODBYE")
},
}

0 comments on commit 52c7cb3

Please sign in to comment.