Commit c6d84c2f authored by Vishal Jain's avatar Vishal Jain
Browse files

Metadata server with pebble db.

Summary: Metadata server uses pebble db if the appropriate flag is set.

Test Plan: Manual testing

Reviewers: michelle, nserrino, vihang, #engineering

Reviewed By: vihang, #engineering

JIRA Issues: PP-2478

Differential Revision: https://phab.corp.pixielabs.ai/D7517

GitOrigin-RevId: 3cb06225f5aef73edf8284640a62059160f247c2
parent f4bf6f6e
Showing with 62 additions and 30 deletions
+62 -30
......@@ -54,7 +54,7 @@ build:
deploy:
kustomize:
paths:
- k8s/vizier/dev
- k8s/vizier/persistent_metadata
profiles:
- name: minikube
patches:
......
......@@ -17,7 +17,10 @@ go_library(
"//src/vizier/services/metadata/controllers:go_default_library",
"//src/vizier/services/metadata/metadataenv:go_default_library",
"//src/vizier/services/metadata/metadatapb:service_pl_go_proto",
"//src/vizier/utils/datastore:go_default_library",
"//src/vizier/utils/datastore/etcd:go_default_library",
"//src/vizier/utils/datastore/pebbledb:go_default_library",
"@com_github_cockroachdb_pebble//:go_default_library",
"@com_github_coreos_etcd//clientv3:go_default_library",
"@com_github_coreos_etcd//pkg/transport:go_default_library",
"@com_github_nats_io_nats_go//:go_default_library",
......
......@@ -6,6 +6,7 @@ import (
"net/http"
"time"
"github.com/cockroachdb/pebble"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"github.com/nats-io/nats.go"
......@@ -22,12 +23,18 @@ import (
"pixielabs.ai/pixielabs/src/vizier/services/metadata/controllers"
"pixielabs.ai/pixielabs/src/vizier/services/metadata/metadataenv"
"pixielabs.ai/pixielabs/src/vizier/services/metadata/metadatapb"
"pixielabs.ai/pixielabs/src/vizier/utils/datastore"
"pixielabs.ai/pixielabs/src/vizier/utils/datastore/etcd"
"pixielabs.ai/pixielabs/src/vizier/utils/datastore/pebbledb"
)
const (
cacheFlushPeriod = 30 * time.Second
cacheClearPeriod = 1 * time.Minute
// pebbledbTTLDuration represents how often we evict from pebble.
pebbledbTTLDuration = 30 * time.Second
// pebbleOpenDir is where the files live in the directory.
pebbleOpenDir = "/metadata"
)
func init() {
......@@ -37,6 +44,44 @@ func init() {
pflag.Duration("renew_period", 5000, "Duration in ms of the time to wait to renew lease")
pflag.String("pod_namespace", "pl", "The namespace this pod runs in. Used for leader elections")
pflag.String("nats_url", "pl-nats", "The URL of NATS")
pflag.Bool("use_etcd_operator", false, "Whether the etcd operator should be used instead of the persistent version.")
}
func mustInitEtcdDatastore() (*etcd.DataStore, func()) {
var tlsConfig *tls.Config
if !viper.GetBool("disable_ssl") {
var err error
tlsConfig, err = etcdTLSConfig()
if err != nil {
log.WithError(err).Fatal("Failed to load SSL for ETCD")
}
}
// Connect to etcd.
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{viper.GetString("md_etcd_server")},
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
if err != nil {
log.WithError(err).Fatal("Failed to connect to etcd at " + viper.GetString("md_etcd_server"))
}
etcdMgr := controllers.NewEtcdManager(etcdClient)
etcdMgr.Run()
cleanupFunc := func() {
etcdMgr.Stop()
etcdClient.Close()
}
return etcd.New(etcdClient), cleanupFunc
}
func mustInitPebbleDatastore() *pebbledb.DataStore {
pebbleDb, err := pebble.Open(pebbleOpenDir, &pebble.Options{})
if err != nil {
log.WithError(err).Fatal("Failed to open pebble database.")
}
return pebbledb.New(pebbleDb, pebbledbTTLDuration)
}
func etcdTLSConfig() (*tls.Config, error) {
......@@ -64,31 +109,8 @@ func main() {
flush := services.InitDefaultSentry(viper.GetString("cluster_id"))
defer flush()
var tlsConfig *tls.Config
if !viper.GetBool("disable_ssl") {
var err error
tlsConfig, err = etcdTLSConfig()
if err != nil {
log.WithError(err).Fatal("Failed to load SSL for ETCD")
}
}
// Connect to etcd.
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: []string{viper.GetString("md_etcd_server")},
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
if err != nil {
log.WithError(err).Fatal("Failed to connect to etcd at " + viper.GetString("md_etcd_server"))
}
defer etcdClient.Close()
etcdMgr := controllers.NewEtcdManager(etcdClient)
etcdMgr.Run()
defer etcdMgr.Stop()
var nc *nats.Conn
var err error
if viper.GetBool("disable_ssl") {
nc, err = nats.Connect(viper.GetString("nats_url"))
} else {
......@@ -138,17 +160,24 @@ func main() {
cancel()
}()
newEtcdDataStore := etcd.New(etcdClient)
var dataStore datastore.MultiGetterSetterDeleterCloser
var cleanupFunc func()
if viper.GetBool("use_etcd_operator") {
dataStore, cleanupFunc = mustInitEtcdDatastore()
defer cleanupFunc()
} else {
dataStore = mustInitPebbleDatastore()
}
defer dataStore.Close()
k8sMds := controllers.NewMetadataDatastore(newEtcdDataStore)
k8sMds := controllers.NewMetadataDatastore(dataStore)
// Listen for K8s metadata updates.
updateCh := make(chan *controllers.K8sResourceMessage)
mdh := controllers.NewK8sMetadataHandler(updateCh, k8sMds, nc)
k8sMc, err := controllers.NewK8sMetadataController(k8sMds, updateCh)
defer k8sMc.Stop()
ads := controllers.NewAgentDatastore(newEtcdDataStore, 24*time.Hour)
ads := controllers.NewAgentDatastore(dataStore, 24*time.Hour)
agtMgr := controllers.NewAgentManager(ads, mdh, nc)
schemaQuitCh := make(chan struct{})
......@@ -169,7 +198,7 @@ func main() {
}
}()
tds := controllers.NewTracepointDatastore(newEtcdDataStore)
tds := controllers.NewTracepointDatastore(dataStore)
// Initialize tracepoint handler.
tracepointMgr := controllers.NewTracepointManager(tds, agtMgr, 30*time.Second)
defer tracepointMgr.Close()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment