Skip to content

Commit

Permalink
1.部分组件New方法出现错误不再panic而是返回错误,2.orm结构调整并新增测试用例
Browse files Browse the repository at this point in the history
  • Loading branch information
keepchen committed May 10, 2024
1 parent ea3a596 commit 98360f0
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 138 deletions.
40 changes: 31 additions & 9 deletions lib/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ var dbInstance *Instance
func InitDB(conf Conf) {
dialectR, dialectW := conf.GenDialector()
//read instance
dbPtrR := initDB(conf, dialectR)
dbPtrR := mustInitDB(conf, dialectR)
//write instance
dbPtrW := initDB(conf, dialectW)
dbPtrW := mustInitDB(conf, dialectW)

dbInstance = &Instance{
R: dbPtrR,
Expand All @@ -33,14 +33,15 @@ func InitDB(conf Conf) {
// NewFreshDB 实例化全新的数据库链接
//
// rInstance为读实例,wInstance为写实例
func NewFreshDB(conf Conf) (rInstance, wInstance *gorm.DB) {
func NewFreshDB(conf Conf) (rInstance *gorm.DB, rErr error, wInstance *gorm.DB, wErr error) {
dialectR, dialectW := conf.GenDialector()
rInstance, wInstance = initDB(conf, dialectR), initDB(conf, dialectW)
rInstance, rErr = initDB(conf, dialectR)
wInstance, wErr = initDB(conf, dialectW)

return
}

func initDB(conf Conf, dialect gorm.Dialector) *gorm.DB {
func mustInitDB(conf Conf, dialect gorm.Dialector) *gorm.DB {
loggerSvc := NewZapLoggerForGorm(logger.GetLogger(), conf)
loggerSvc.SetAsDefault()
dbPtr, err := gorm.Open(dialect, &gorm.Config{
Expand All @@ -63,6 +64,29 @@ func initDB(conf Conf, dialect gorm.Dialector) *gorm.DB {
return dbPtr
}

func initDB(conf Conf, dialect gorm.Dialector) (*gorm.DB, error) {
loggerSvc := NewZapLoggerForGorm(logger.GetLogger(), conf)
loggerSvc.SetAsDefault()
dbPtr, err := gorm.Open(dialect, &gorm.Config{
Logger: loggerSvc,
})
if err != nil {
return nil, err
}

sqlDB, err := dbPtr.DB()
if err != nil {
return nil, err
}

sqlDB.SetMaxOpenConns(conf.ConnectionPool.MaxOpenConnCount)
sqlDB.SetMaxIdleConns(conf.ConnectionPool.MaxIdleConnCount)
sqlDB.SetConnMaxLifetime(time.Minute * time.Duration(conf.ConnectionPool.ConnMaxLifeTimeMinutes))
sqlDB.SetConnMaxIdleTime(time.Minute * time.Duration(conf.ConnectionPool.ConnMaxIdleTimeMinutes))

return dbPtr, nil
}

// GetInstance 获取数据库实例
//
// 获取由InitDB实例化后的连接
Expand All @@ -73,10 +97,8 @@ func GetInstance() *Instance {
// New 初始化化全新的数据库链接
//
// rInstance为读实例,wInstance为写实例
func New(conf Conf) (rInstance, wInstance *gorm.DB) {
rInstance, wInstance = NewFreshDB(conf)

return
func New(conf Conf) (rInstance *gorm.DB, rErr error, wInstance *gorm.DB, wErr error) {
return NewFreshDB(conf)
}

// Init 初始化数据库连接
Expand Down
26 changes: 14 additions & 12 deletions lib/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ func Init(conf Conf, topic, groupID string) {
// New 初始化连接
//
// 该方法会初始化连接、读实例、写实例
func New(conf Conf, topic, groupID string) ([]*kafkaLib.Conn, *kafkaLib.Writer, *kafkaLib.Reader) {
connections := NewConnections(conf)
writer := NewWriter(conf, topic)
reader := NewReader(conf, topic, groupID)

return connections, writer, reader
func New(conf Conf, topic, groupID string) (connections []*kafkaLib.Conn,
writer *kafkaLib.Writer, wErr error,
reader *kafkaLib.Reader, rErr error) {
connections = NewConnections(conf)
writer, wErr = NewWriter(conf, topic)
reader, rErr = NewReader(conf, topic, groupID)

return
}

// InitConnections 初始化连接
Expand Down Expand Up @@ -175,7 +177,7 @@ func InitWriter(conf Conf, topic string) {
}

// NewWriter 实例化新的写实例
func NewWriter(conf Conf, topic string) *kafkaLib.Writer {
func NewWriter(conf Conf, topic string) (*kafkaLib.Writer, error) {
writer := &kafkaLib.Writer{
Addr: kafkaLib.TCP(conf.Endpoints...),
Topic: topic,
Expand All @@ -189,14 +191,14 @@ func NewWriter(conf Conf, topic string) *kafkaLib.Writer {
if len(conf.Username) != 0 && len(conf.Password) != 0 {
mechanism, mErr := getMechanism(conf)
if mErr != nil {
panic(mErr)
return nil, mErr
}
transport.SASL = mechanism
}

writer.Transport = transport

return writer
return writer, nil
}

// InitReader 初始化读实例
Expand Down Expand Up @@ -236,7 +238,7 @@ func InitReader(conf Conf, topic, groupID string) {
}

// NewReader 实例化新的读实例
func NewReader(conf Conf, topic, groupID string) *kafkaLib.Reader {
func NewReader(conf Conf, topic, groupID string) (*kafkaLib.Reader, error) {
if conf.Timeout < 1 {
conf.Timeout = 10000
}
Expand All @@ -252,7 +254,7 @@ func NewReader(conf Conf, topic, groupID string) *kafkaLib.Reader {
if len(conf.Username) != 0 && len(conf.Password) != 0 {
mechanism, mErr := getMechanism(conf)
if mErr != nil {
panic(mErr)
return nil, mErr
}
dialer.SASLMechanism = mechanism
}
Expand All @@ -264,7 +266,7 @@ func NewReader(conf Conf, topic, groupID string) *kafkaLib.Reader {
Dialer: dialer,
})

return reader
return reader, nil
}

// 根据SASL授权类型获取认证装置
Expand Down
28 changes: 24 additions & 4 deletions lib/logger/zap.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,35 +219,55 @@ func exporterProvider(cfg Conf) zapcore.WriteSyncer {

switch strings.ToLower(cfg.Exporter.Provider) {
case "redis":
redisInstance, err := redis.New(cfg.Exporter.Redis.ConnConf)
if err != nil {
log.Println("[logger] using (redis) exporter, but initialize connection error: ", err)
return writer
}
redisWriter := &redisWriterStd{
cli: redis.New(cfg.Exporter.Redis.ConnConf),
cli: redisInstance,
listKey: cfg.Exporter.Redis.ListKey,
}

writer = redisWriter
log.Println("[logger] using (redis) exporter")
return writer
case "redis-cluster":
redisInstance, err := redis.NewCluster(cfg.Exporter.Redis.ClusterConnConf)
if err != nil {
log.Println("[logger] using (redis-cluster) exporter, but initialize connection error: ", err)
return writer
}
redisWriter := &redisClusterWriterStd{
cli: redis.NewCluster(cfg.Exporter.Redis.ClusterConnConf),
cli: redisInstance,
listKey: cfg.Exporter.Redis.ListKey,
}

writer = redisWriter
log.Println("[logger] using (redis-cluster) exporter")
return writer
case "nats":
natsInstance, err := nats.New(cfg.Exporter.Nats.ConnConf)
if err != nil {
log.Println("[logger] using (nats) exporter, but initialize connection error: ", err)
return writer
}
natsWriter := &natsWriterStd{
cli: nats.New(cfg.Exporter.Nats.ConnConf),
cli: natsInstance,
subjectKey: cfg.Exporter.Nats.Subject,
}

writer = natsWriter
log.Println("[logger] using (nats) exporter")
return writer
case "kafka":
kafkaInstance, err := kafka.NewWriter(cfg.Exporter.Kafka.ConnConf, cfg.Exporter.Kafka.Topic)
if err != nil {
log.Println("[logger] using (kafka) exporter, but initialize writer error: ", err)
return writer
}
kafkaWriter := &kafkaWriterStd{
writer: kafka.NewWriter(cfg.Exporter.Kafka.ConnConf, cfg.Exporter.Kafka.Topic),
writer: kafkaInstance,
topic: cfg.Exporter.Kafka.Topic,
}

Expand Down
17 changes: 14 additions & 3 deletions lib/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ var natsInstance *natsLib.Conn

// Init 初始化
func Init(conf Conf) {
conn := initNats(conf)
conn := mustInitNats(conf)

natsInstance = conn
}

func initNats(conf Conf) *natsLib.Conn {
func mustInitNats(conf Conf) *natsLib.Conn {
var opts natsLib.Option
if len(conf.Username) != 0 && len(conf.Password) != 0 {
opts = natsLib.UserInfo(conf.Username, conf.Password)
Expand All @@ -30,12 +30,23 @@ func initNats(conf Conf) *natsLib.Conn {
return conn
}

func initNats(conf Conf) (*natsLib.Conn, error) {
var opts natsLib.Option
if len(conf.Username) != 0 && len(conf.Password) != 0 {
opts = natsLib.UserInfo(conf.Username, conf.Password)
}

conn, err := natsLib.Connect(strings.Join(conf.Endpoints, ","), opts)

return conn, err
}

// GetInstance 获取链接实例
func GetInstance() *natsLib.Conn {
return natsInstance
}

// New 初始化新的nats实例
func New(conf Conf) *natsLib.Conn {
func New(conf Conf) (*natsLib.Conn, error) {
return initNats(conf)
}
27 changes: 24 additions & 3 deletions lib/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var redisInstance *redisLib.Client

// InitRedis 初始化redis连接
func InitRedis(conf Conf) {
rdb := initRedis(conf)
rdb := mustInitRedis(conf)

redisInstance = rdb
}
Expand All @@ -24,7 +24,7 @@ func GetInstance() *redisLib.Client {
return redisInstance
}

func initRedis(conf Conf) *redisLib.Client {
func mustInitRedis(conf Conf) *redisLib.Client {
opts := &redisLib.Options{
Addr: fmt.Sprintf("%s:%d", conf.Host, conf.Port),
Username: conf.Username,
Expand All @@ -48,7 +48,28 @@ func initRedis(conf Conf) *redisLib.Client {
return rdb
}

func initRedis(conf Conf) (*redisLib.Client, error) {
opts := &redisLib.Options{
Addr: fmt.Sprintf("%s:%d", conf.Host, conf.Port),
Username: conf.Username,
Password: conf.Password,
DB: conf.Database,
}
if conf.SSLEnable {
//https://redis.uptrace.dev/guide/go-redis.html#using-tls
//
//To enable TLS/SSL, you need to provide an empty tls.Config.
//If you're using private certs, you need to specify them in the tls.Config
opts.TLSConfig = &tls.Config{}
}
rdb := redisLib.NewClient(opts)

err := rdb.Ping(context.Background()).Err()

return rdb, err
}

// New 实例化新的实例
func New(conf Conf) *redisLib.Client {
func New(conf Conf) (*redisLib.Client, error) {
return initRedis(conf)
}
46 changes: 43 additions & 3 deletions lib/redis/redis_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var redisClusterInstance *redisLib.ClusterClient

// InitRedisCluster 初始化redis集群连接
func InitRedisCluster(conf ClusterConf) {
rdb := initRedisCluster(conf)
rdb := mustInitRedisCluster(conf)

redisClusterInstance = rdb
}
Expand All @@ -28,7 +28,7 @@ func GetClusterInstance() *redisLib.ClusterClient {
return redisClusterInstance
}

func initRedisCluster(conf ClusterConf) *redisLib.ClusterClient {
func mustInitRedisCluster(conf ClusterConf) *redisLib.ClusterClient {
var (
endpoints = make([]string, len(conf.Endpoints))
username string
Expand Down Expand Up @@ -71,7 +71,47 @@ func initRedisCluster(conf ClusterConf) *redisLib.ClusterClient {
return rdb
}

func initRedisCluster(conf ClusterConf) (*redisLib.ClusterClient, error) {
var (
endpoints = make([]string, len(conf.Endpoints))
username string
password string
)
for i := 0; i < len(conf.Endpoints); i++ {
endpoints[i] = fmt.Sprintf("%s:%d", conf.Endpoints[i].Host, conf.Endpoints[i].Port)
if len(conf.Endpoints[i].Password) != 0 {
password = conf.Endpoints[i].Password
}
if len(conf.Endpoints[i].Username) != 0 {
username = conf.Endpoints[i].Username
}
}
opts := &redisLib.ClusterOptions{
Addrs: endpoints,
Username: username,
Password: password,
MaxRedirects: len(conf.Endpoints) - 1,
}
if opts.MaxRedirects < 3 {
opts.MaxRedirects = 3
}
if conf.SSLEnable {
//https://redis.uptrace.dev/guide/go-redis.html#using-tls
//
//To enable TLS/SSL, you need to provide an empty tls.Config.
//If you're using private certs, you need to specify them in the tls.Config
opts.TLSConfig = &tls.Config{}
}
rdb := redisLib.NewClusterClient(opts)

err := rdb.ForEachShard(context.Background(), func(ctx context.Context, shard *redisLib.Client) error {
return shard.Ping(ctx).Err()
})

return rdb, err
}

// NewCluster 实例化新的实例
func NewCluster(conf ClusterConf) *redisLib.ClusterClient {
func NewCluster(conf ClusterConf) (*redisLib.ClusterClient, error) {
return initRedisCluster(conf)
}
2 changes: 1 addition & 1 deletion orm/model/base.go → orm/base.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package model
package orm

import (
"time"
Expand Down
Loading

0 comments on commit 98360f0

Please sign in to comment.