Awesome
slog: Kafka handler
A Kafka Handler for slog Go library.
<div align="center"> <hr> <sup><b>Sponsored by:</b></sup> <br> <a href="https://quickwit.io?utm_campaign=github_sponsorship&utm_medium=referral&utm_content=samber-slog-kafka&utm_source=github"> <div> <img src="https://github.com/samber/oops/assets/2951285/49aaaa2b-b8c6-4f21-909f-c12577bb6a2e" width="240" alt="Quickwit"> </div> <div> Cloud-native search engine for observability - An OSS alternative to Splunk, Elasticsearch, Loki, and Tempo. </div> </a> <hr> </div>See also:
- slog-multi:
slog.Handler
chaining, fanout, routing, failover, load balancing... - slog-formatter:
slog
attribute formatting - slog-sampling:
slog
sampling policy - slog-mock:
slog.Handler
for test purposes
HTTP middlewares:
- slog-gin: Gin middleware for
slog
logger - slog-echo: Echo middleware for
slog
logger - slog-fiber: Fiber middleware for
slog
logger - slog-chi: Chi middleware for
slog
logger - slog-http:
net/http
middleware forslog
logger
Loggers:
- slog-zap: A
slog
handler forZap
- slog-zerolog: A
slog
handler forZerolog
- slog-logrus: A
slog
handler forLogrus
Log sinks:
- slog-datadog: A
slog
handler forDatadog
- slog-betterstack: A
slog
handler forBetterstack
- slog-rollbar: A
slog
handler forRollbar
- slog-loki: A
slog
handler forLoki
- slog-sentry: A
slog
handler forSentry
- slog-syslog: A
slog
handler forSyslog
- slog-logstash: A
slog
handler forLogstash
- slog-fluentd: A
slog
handler forFluentd
- slog-graylog: A
slog
handler forGraylog
- slog-quickwit: A
slog
handler forQuickwit
- slog-slack: A
slog
handler forSlack
- slog-telegram: A
slog
handler forTelegram
- slog-mattermost: A
slog
handler forMattermost
- slog-microsoft-teams: A
slog
handler forMicrosoft Teams
- slog-webhook: A
slog
handler forWebhook
- slog-kafka: A
slog
handler forKafka
- slog-nats: A
slog
handler forNATS
- slog-parquet: A
slog
handler forParquet
+Object Storage
- slog-channel: A
slog
handler for Go channels
🚀 Install
go get github.com/samber/slog-kafka/v2
Compatibility: go >= 1.21
No breaking changes will be made to exported APIs before v3.0.0.
💡 Usage
GoDoc: https://pkg.go.dev/github.com/samber/slog-kafka/v2
Handler options
type Option struct {
// log level (default: debug)
Level slog.Leveler
// Kafka Writer
KafkaWriter *kafka.Writer
Timeout time.Duration // default: 60s
// optional: customize Kafka event builder
Converter Converter
// optional: custom marshaler
Marshaler func(v any) ([]byte, error)
// optional: fetch attributes from context
AttrFromContext []func(ctx context.Context) []slog.Attr
// optional: see slog.HandlerOptions
AddSource bool
ReplaceAttr func(groups []string, a slog.Attr) slog.Attr
}
Other global parameters:
slogkafka.SourceKey = "source"
slogkafka.ContextKey = "extra"
slogkafka.RequestKey = "request"
slogkafka.ErrorKeys = []string{"error", "err"}
slogkafka.RequestIgnoreHeaders = false
Supported attributes
The following attributes are interpreted by slogkafka.DefaultConverter
:
Atribute name | slog.Kind | Underlying type |
---|---|---|
"user" | group (see below) | |
"error" | any | error |
"request" | any | *http.Request |
other attributes | * |
Other attributes will be injected in extra
field.
Users must be of type slog.Group
. Eg:
slog.Group("user",
slog.String("id", "user-123"),
slog.String("username", "samber"),
slog.Time("created_at", time.Now()),
)
Example
import (
"context"
"fmt"
"time"
slogkafka "github.com/samber/slog-kafka/v2"
"github.com/segmentio/kafka-go"
"log/slog"
)
func main() {
// docker-compose up -d
uri := "127.0.0.1:9092"
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
}
conn, err := dialer.DialContext(context.Background(), "tcp", uri)
if err != nil {
panic(err)
}
err = conn.CreateTopics(kafka.TopicConfig{
Topic: "logs",
NumPartitions: 12,
ReplicationFactor: 1,
})
if err != nil {
panic(err)
}
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{uri},
Topic: "logs",
Dialer: dialer,
Async: true, // !
Balancer: &kafka.Hash{},
MaxAttempts: 3,
Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
fmt.Printf(msg+"\n", args...)
}),
ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) {
fmt.Printf(msg+"\n", args...)
}),
})
defer writer.Close()
defer conn.Close()
logger := slog.New(slogkafka.Option{Level: slog.LevelDebug, KafkaWriter: writer}.NewKafkaHandler())
logger = logger.With("release", "v1.0.0")
logger.
With(
slog.Group("user",
slog.String("id", "user-123"),
slog.Time("created_at", time.Now()),
),
).
With("error", fmt.Errorf("an error")).
Error("a message")
}
Kafka message:
{
"level": "ERROR",
"logger": "samber/slog-kafka",
"message": "a message",
"timestamp": "2023-04-30T01:33:21.676768Z",
"error": {
"error": "an error",
"kind": "*errors.errorString",
"stack": null
},
"extra": {
"release": "v1.0.0"
},
"user": {
"created_at": "2023-04-30T01:33:21.676704Z",
"id": "user-123"
}
}
Tracing
Import the samber/slog-otel library.
import (
slogkafka "github.com/samber/slog-kafka"
slogotel "github.com/samber/slog-otel"
"go.opentelemetry.io/otel/sdk/trace"
)
func main() {
tp := trace.NewTracerProvider(
trace.WithSampler(trace.AlwaysSample()),
)
tracer := tp.Tracer("hello/world")
ctx, span := tracer.Start(context.Background(), "foo")
defer span.End()
span.AddEvent("bar")
logger := slog.New(
slogkafka.Option{
// ...
AttrFromContext: []func(ctx context.Context) []slog.Attr{
slogotel.ExtractOtelAttrFromContext([]string{"tracing"}, "trace_id", "span_id"),
},
}.NewKafkaHandler(),
)
logger.ErrorContext(ctx, "a message")
}
🤝 Contributing
- Ping me on twitter @samuelberthe (DMs, mentions, whatever :))
- Fork the project
- Fix open issues or request new features
Don't hesitate ;)
# Install some dev dependencies
make tools
# Run tests
make test
# or
make watch-test
👤 Contributors
💫 Show your support
Give a ⭐️ if this project helped you!
📝 License
Copyright © 2023 Samuel Berthe.
This project is MIT licensed.