You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
399 lines
15 KiB
399 lines
15 KiB
From 7b2292ee20c5d49053cc5262dfbc99ce121b9b74 Mon Sep 17 00:00:00 2001
|
|
From: tifayuki <tifayuki@gmail.com>
|
|
Date: Tue, 13 Feb 2018 13:30:56 -0800
|
|
Subject: [PATCH 1/4] Add notification metrics
|
|
|
|
It adds notification related prometheus metrics, including:
|
|
- total count for events/success/failure/error
|
|
- total count for notification per each status code
|
|
- gauge of the pending notification queue
|
|
|
|
Signed-off-by: tifayuki <tifayuki@gmail.com>
|
|
---
|
|
metrics/prometheus.go | 3 +++
|
|
notifications/metrics.go | 28 ++++++++++++++++++++++++++++
|
|
2 files changed, 31 insertions(+)
|
|
|
|
diff --git a/metrics/prometheus.go b/metrics/prometheus.go
|
|
index b5a532144..91b32b23d 100644
|
|
--- a/metrics/prometheus.go
|
|
+++ b/metrics/prometheus.go
|
|
@@ -10,4 +10,7 @@ const (
|
|
var (
|
|
// StorageNamespace is the prometheus namespace of blob/cache related operations
|
|
StorageNamespace = metrics.NewNamespace(NamespacePrefix, "storage", nil)
|
|
+
|
|
+ // NotificationsNamespace is the prometheus namespace of notification related metrics
|
|
+ NotificationsNamespace = metrics.NewNamespace(NamespacePrefix, "notifications", nil)
|
|
)
|
|
diff --git a/notifications/metrics.go b/notifications/metrics.go
|
|
index a20af1687..69960e9cb 100644
|
|
--- a/notifications/metrics.go
|
|
+++ b/notifications/metrics.go
|
|
@@ -5,6 +5,18 @@ import (
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
+
|
|
+ prometheus "github.com/docker/distribution/metrics"
|
|
+ "github.com/docker/go-metrics"
|
|
+)
|
|
+
|
|
+var (
|
|
+ // eventsCounter counts total events of incoming, success, failure, and errors
|
|
+ eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type")
|
|
+ // pendingGauge measures the pending queue size
|
|
+ pendingGauge = prometheus.NotificationsNamespace.NewGauge("pending", "The gauge of pending events in queue", metrics.Total)
|
|
+ // statusCounter counts the total notification call per each status code
|
|
+ statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code")
|
|
)
|
|
|
|
// EndpointMetrics track various actions taken by the endpoint, typically by
|
|
@@ -61,6 +73,9 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve
|
|
defer emsl.safeMetrics.Unlock()
|
|
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
|
|
emsl.Successes += len(events)
|
|
+
|
|
+ statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
|
|
+ eventsCounter.WithValues("Successes").Inc(1)
|
|
}
|
|
|
|
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
|
|
@@ -68,12 +83,17 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve
|
|
defer emsl.safeMetrics.Unlock()
|
|
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
|
|
emsl.Failures += len(events)
|
|
+
|
|
+ statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
|
|
+ eventsCounter.WithValues("Failures").Inc(1)
|
|
}
|
|
|
|
func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
|
|
emsl.safeMetrics.Lock()
|
|
defer emsl.safeMetrics.Unlock()
|
|
emsl.Errors += len(events)
|
|
+
|
|
+ eventsCounter.WithValues("Errors").Inc(1)
|
|
}
|
|
|
|
// endpointMetricsEventQueueListener maintains the incoming events counter and
|
|
@@ -87,12 +107,17 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
|
|
defer eqc.Unlock()
|
|
eqc.Events += len(events)
|
|
eqc.Pending += len(events)
|
|
+
|
|
+ eventsCounter.WithValues("Events").Inc()
|
|
+ pendingGauge.Inc(1)
|
|
}
|
|
|
|
func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
|
|
eqc.Lock()
|
|
defer eqc.Unlock()
|
|
eqc.Pending -= len(events)
|
|
+
|
|
+ pendingGauge.Dec(1)
|
|
}
|
|
|
|
// endpoints is global registry of endpoints used to report metrics to expvar
|
|
@@ -149,4 +174,7 @@ func init() {
|
|
}))
|
|
|
|
registry.(*expvar.Map).Set("notifications", ¬ifications)
|
|
+
|
|
+ // register prometheus metrics
|
|
+ metrics.Register(prometheus.NotificationsNamespace)
|
|
}
|
|
|
|
From 4497e40eda1e0024f055c09ab480b7816a1147b1 Mon Sep 17 00:00:00 2001
|
|
From: Honglin Feng <tifayuki@gmail.com>
|
|
Date: Thu, 11 Oct 2018 21:39:02 +0800
|
|
Subject: [PATCH 2/4] add label to the metrics
|
|
|
|
Signed-off-by: Honglin Feng <tifayuki@gmail.com>
|
|
---
|
|
notifications/endpoint.go | 2 +-
|
|
notifications/http_test.go | 2 +-
|
|
notifications/metrics.go | 26 ++++++++++++++------------
|
|
notifications/sinks_test.go | 2 +-
|
|
4 files changed, 17 insertions(+), 15 deletions(-)
|
|
|
|
diff --git a/notifications/endpoint.go b/notifications/endpoint.go
|
|
index a8a52d0c9..854f1dd6c 100644
|
|
--- a/notifications/endpoint.go
|
|
+++ b/notifications/endpoint.go
|
|
@@ -58,7 +58,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
|
|
endpoint.url = url
|
|
endpoint.EndpointConfig = config
|
|
endpoint.defaults()
|
|
- endpoint.metrics = newSafeMetrics()
|
|
+ endpoint.metrics = newSafeMetrics(name)
|
|
|
|
// Configures the inmemory queue, retry, http pipeline.
|
|
endpoint.Sink = newHTTPSink(
|
|
diff --git a/notifications/http_test.go b/notifications/http_test.go
|
|
index de47f789e..b7845cf95 100644
|
|
--- a/notifications/http_test.go
|
|
+++ b/notifications/http_test.go
|
|
@@ -63,7 +63,7 @@ func TestHTTPSink(t *testing.T) {
|
|
})
|
|
server := httptest.NewTLSServer(serverHandler)
|
|
|
|
- metrics := newSafeMetrics()
|
|
+ metrics := newSafeMetrics("")
|
|
sink := newHTTPSink(server.URL, 0, nil, nil,
|
|
&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
|
|
|
|
diff --git a/notifications/metrics.go b/notifications/metrics.go
|
|
index 69960e9cb..4464edd8f 100644
|
|
--- a/notifications/metrics.go
|
|
+++ b/notifications/metrics.go
|
|
@@ -12,11 +12,11 @@ import (
|
|
|
|
var (
|
|
// eventsCounter counts total events of incoming, success, failure, and errors
|
|
- eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type")
|
|
+ eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "to")
|
|
// pendingGauge measures the pending queue size
|
|
- pendingGauge = prometheus.NotificationsNamespace.NewGauge("pending", "The gauge of pending events in queue", metrics.Total)
|
|
+ pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "to")
|
|
// statusCounter counts the total notification call per each status code
|
|
- statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code")
|
|
+ statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "to")
|
|
)
|
|
|
|
// EndpointMetrics track various actions taken by the endpoint, typically by
|
|
@@ -34,14 +34,16 @@ type EndpointMetrics struct {
|
|
// safeMetrics guards the metrics implementation with a lock and provides a
|
|
// safe update function.
|
|
type safeMetrics struct {
|
|
+ EndpointName string
|
|
EndpointMetrics
|
|
sync.Mutex // protects statuses map
|
|
}
|
|
|
|
// newSafeMetrics returns safeMetrics with map allocated.
|
|
-func newSafeMetrics() *safeMetrics {
|
|
+func newSafeMetrics(name string) *safeMetrics {
|
|
var sm safeMetrics
|
|
sm.Statuses = make(map[string]int)
|
|
+ sm.EndpointName = name
|
|
return &sm
|
|
}
|
|
|
|
@@ -74,8 +76,8 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve
|
|
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
|
|
emsl.Successes += len(events)
|
|
|
|
- statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
|
|
- eventsCounter.WithValues("Successes").Inc(1)
|
|
+ statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
|
|
+ eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1)
|
|
}
|
|
|
|
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
|
|
@@ -84,8 +86,8 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve
|
|
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
|
|
emsl.Failures += len(events)
|
|
|
|
- statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
|
|
- eventsCounter.WithValues("Failures").Inc(1)
|
|
+ statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
|
|
+ eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1)
|
|
}
|
|
|
|
func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
|
|
@@ -93,7 +95,7 @@ func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
|
|
defer emsl.safeMetrics.Unlock()
|
|
emsl.Errors += len(events)
|
|
|
|
- eventsCounter.WithValues("Errors").Inc(1)
|
|
+ eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1)
|
|
}
|
|
|
|
// endpointMetricsEventQueueListener maintains the incoming events counter and
|
|
@@ -108,8 +110,8 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
|
|
eqc.Events += len(events)
|
|
eqc.Pending += len(events)
|
|
|
|
- eventsCounter.WithValues("Events").Inc()
|
|
- pendingGauge.Inc(1)
|
|
+ eventsCounter.WithValues("Events", eqc.EndpointName).Inc()
|
|
+ pendingGauge.WithValues(eqc.EndpointName).Inc(1)
|
|
}
|
|
|
|
func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
|
|
@@ -117,7 +119,7 @@ func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
|
|
defer eqc.Unlock()
|
|
eqc.Pending -= len(events)
|
|
|
|
- pendingGauge.Dec(1)
|
|
+ pendingGauge.WithValues(eqc.EndpointName).Dec(1)
|
|
}
|
|
|
|
// endpoints is global registry of endpoints used to report metrics to expvar
|
|
diff --git a/notifications/sinks_test.go b/notifications/sinks_test.go
|
|
index 06f88b2c9..4a69486b5 100644
|
|
--- a/notifications/sinks_test.go
|
|
+++ b/notifications/sinks_test.go
|
|
@@ -66,7 +66,7 @@ func TestBroadcaster(t *testing.T) {
|
|
func TestEventQueue(t *testing.T) {
|
|
const nevents = 1000
|
|
var ts testSink
|
|
- metrics := newSafeMetrics()
|
|
+ metrics := newSafeMetrics("")
|
|
eq := newEventQueue(
|
|
// delayed sync simulates destination slower than channel comms
|
|
&delayedSink{
|
|
|
|
From 73e4232b5171c2988b0daeea517aa07386e7945d Mon Sep 17 00:00:00 2001
|
|
From: Honglin Feng <tifayuki@gmail.com>
|
|
Date: Mon, 15 Oct 2018 19:50:38 +0800
|
|
Subject: [PATCH 3/4] run go fmt
|
|
|
|
Signed-off-by: Honglin Feng <tifayuki@gmail.com>
|
|
---
|
|
registry/storage/driver/s3-aws/s3.go | 10 +++++-----
|
|
registry/storage/linkedblobstore.go | 16 ++++++++--------
|
|
registry/storage/linkedblobstore_test.go | 4 ++--
|
|
3 files changed, 15 insertions(+), 15 deletions(-)
|
|
|
|
diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go
|
|
index 800435d01..9cd87dbab 100644
|
|
--- a/registry/storage/driver/s3-aws/s3.go
|
|
+++ b/registry/storage/driver/s3-aws/s3.go
|
|
@@ -476,11 +476,11 @@ func New(params DriverParameters) (*Driver, error) {
|
|
// }
|
|
|
|
d := &driver{
|
|
- S3: s3obj,
|
|
- Bucket: params.Bucket,
|
|
- ChunkSize: params.ChunkSize,
|
|
- Encrypt: params.Encrypt,
|
|
- KeyID: params.KeyID,
|
|
+ S3: s3obj,
|
|
+ Bucket: params.Bucket,
|
|
+ ChunkSize: params.ChunkSize,
|
|
+ Encrypt: params.Encrypt,
|
|
+ KeyID: params.KeyID,
|
|
MultipartCopyChunkSize: params.MultipartCopyChunkSize,
|
|
MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
|
|
MultipartCopyThresholdSize: params.MultipartCopyThresholdSize,
|
|
diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go
|
|
index de591c8a5..3fb1da26f 100644
|
|
--- a/registry/storage/linkedblobstore.go
|
|
+++ b/registry/storage/linkedblobstore.go
|
|
@@ -312,14 +312,14 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
|
|
}
|
|
|
|
bw := &blobWriter{
|
|
- ctx: ctx,
|
|
- blobStore: lbs,
|
|
- id: uuid,
|
|
- startedAt: startedAt,
|
|
- digester: digest.Canonical.Digester(),
|
|
- fileWriter: fw,
|
|
- driver: lbs.driver,
|
|
- path: path,
|
|
+ ctx: ctx,
|
|
+ blobStore: lbs,
|
|
+ id: uuid,
|
|
+ startedAt: startedAt,
|
|
+ digester: digest.Canonical.Digester(),
|
|
+ fileWriter: fw,
|
|
+ driver: lbs.driver,
|
|
+ path: path,
|
|
resumableDigestEnabled: lbs.resumableDigestEnabled,
|
|
}
|
|
|
|
diff --git a/registry/storage/linkedblobstore_test.go b/registry/storage/linkedblobstore_test.go
|
|
index e0ffd2796..85376f715 100644
|
|
--- a/registry/storage/linkedblobstore_test.go
|
|
+++ b/registry/storage/linkedblobstore_test.go
|
|
@@ -162,8 +162,8 @@ type mockBlobDescriptorServiceFactory struct {
|
|
func (f *mockBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService {
|
|
return &mockBlobDescriptorService{
|
|
BlobDescriptorService: svc,
|
|
- t: f.t,
|
|
- stats: f.stats,
|
|
+ t: f.t,
|
|
+ stats: f.stats,
|
|
}
|
|
}
|
|
|
|
|
|
From 5c66b577b027e3b314680f245be4213a002fcee0 Mon Sep 17 00:00:00 2001
|
|
From: Honglin Feng <tifayuki@gmail.com>
|
|
Date: Mon, 15 Oct 2018 20:18:36 +0800
|
|
Subject: [PATCH 4/4] run go fmt and goimports
|
|
|
|
Signed-off-by: Honglin Feng <tifayuki@gmail.com>
|
|
---
|
|
registry/storage/driver/s3-aws/s3.go | 10 +++++-----
|
|
registry/storage/linkedblobstore.go | 16 ++++++++--------
|
|
registry/storage/linkedblobstore_test.go | 4 ++--
|
|
3 files changed, 15 insertions(+), 15 deletions(-)
|
|
|
|
diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go
|
|
index 9cd87dbab..800435d01 100644
|
|
--- a/registry/storage/driver/s3-aws/s3.go
|
|
+++ b/registry/storage/driver/s3-aws/s3.go
|
|
@@ -476,11 +476,11 @@ func New(params DriverParameters) (*Driver, error) {
|
|
// }
|
|
|
|
d := &driver{
|
|
- S3: s3obj,
|
|
- Bucket: params.Bucket,
|
|
- ChunkSize: params.ChunkSize,
|
|
- Encrypt: params.Encrypt,
|
|
- KeyID: params.KeyID,
|
|
+ S3: s3obj,
|
|
+ Bucket: params.Bucket,
|
|
+ ChunkSize: params.ChunkSize,
|
|
+ Encrypt: params.Encrypt,
|
|
+ KeyID: params.KeyID,
|
|
MultipartCopyChunkSize: params.MultipartCopyChunkSize,
|
|
MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
|
|
MultipartCopyThresholdSize: params.MultipartCopyThresholdSize,
|
|
diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go
|
|
index 3fb1da26f..de591c8a5 100644
|
|
--- a/registry/storage/linkedblobstore.go
|
|
+++ b/registry/storage/linkedblobstore.go
|
|
@@ -312,14 +312,14 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
|
|
}
|
|
|
|
bw := &blobWriter{
|
|
- ctx: ctx,
|
|
- blobStore: lbs,
|
|
- id: uuid,
|
|
- startedAt: startedAt,
|
|
- digester: digest.Canonical.Digester(),
|
|
- fileWriter: fw,
|
|
- driver: lbs.driver,
|
|
- path: path,
|
|
+ ctx: ctx,
|
|
+ blobStore: lbs,
|
|
+ id: uuid,
|
|
+ startedAt: startedAt,
|
|
+ digester: digest.Canonical.Digester(),
|
|
+ fileWriter: fw,
|
|
+ driver: lbs.driver,
|
|
+ path: path,
|
|
resumableDigestEnabled: lbs.resumableDigestEnabled,
|
|
}
|
|
|
|
diff --git a/registry/storage/linkedblobstore_test.go b/registry/storage/linkedblobstore_test.go
|
|
index 85376f715..e0ffd2796 100644
|
|
--- a/registry/storage/linkedblobstore_test.go
|
|
+++ b/registry/storage/linkedblobstore_test.go
|
|
@@ -162,8 +162,8 @@ type mockBlobDescriptorServiceFactory struct {
|
|
func (f *mockBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService {
|
|
return &mockBlobDescriptorService{
|
|
BlobDescriptorService: svc,
|
|
- t: f.t,
|
|
- stats: f.stats,
|
|
+ t: f.t,
|
|
+ stats: f.stats,
|
|
}
|
|
}
|
|
|