@@ -506,17 +506,6 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
506506 return & logproto.PushResponse {}, httpgrpc .Errorf (http .StatusUnprocessableEntity , validation .MissingStreamsErrorMsg )
507507 }
508508
509- if d .cfg .IngestLimitsEnabled {
510- exceedsLimits , err := d .exceedsLimits (ctx , tenantID , req .Streams )
511- if err != nil {
512- level .Error (d .logger ).Log ("msg" , "failed to check if request exceeds limits, request has been accepted" , "err" , err )
513- } else if len (exceedsLimits .RejectedStreams ) > 0 {
514- level .Error (d .logger ).Log ("msg" , "request exceeded limits" , "tenant" , tenantID )
515- } else {
516- level .Debug (d .logger ).Log ("msg" , "request accepted" , "tenant" , tenantID )
517- }
518- }
519-
520509 // First we flatten out the request into a list of samples.
521510 // We use the heuristic of 1 sample per TS to size the array.
522511 // We also work out the hash value at the same time.
@@ -706,6 +695,17 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
706695 return & logproto.PushResponse {}, validationErr
707696 }
708697
698+ if d .cfg .IngestLimitsEnabled {
699+ exceedsLimits , err := d .exceedsLimits (ctx , tenantID , streams )
700+ if err != nil {
701+ level .Error (d .logger ).Log ("msg" , "failed to check if request exceeds limits, request has been accepted" , "err" , err )
702+ } else if len (exceedsLimits .RejectedStreams ) > 0 {
703+ level .Error (d .logger ).Log ("msg" , "request exceeded limits" , "tenant" , tenantID )
704+ } else {
705+ level .Debug (d .logger ).Log ("msg" , "request accepted" , "tenant" , tenantID )
706+ }
707+ }
708+
709709 if ! d .ingestionRateLimiter .AllowN (now , tenantID , validationContext .validationMetrics .aggregatedPushStats .lineSize ) {
710710 d .trackDiscardedData (ctx , req , validationContext , tenantID , validationContext .validationMetrics , validation .RateLimited , streamResolver )
711711
@@ -1152,7 +1152,7 @@ func (d *Distributor) sendStreams(task pushIngesterTask) {
11521152 }
11531153}
11541154
1155- func (d * Distributor ) exceedsLimits (ctx context.Context , tenantID string , streams []logproto. Stream ) (* logproto.ExceedsLimitsResponse , error ) {
1155+ func (d * Distributor ) exceedsLimits (ctx context.Context , tenantID string , streams []KeyedStream ) (* logproto.ExceedsLimitsResponse , error ) {
11561156 // We use an FNV-1 of all stream hashes in the request to load balance requests
11571157 // to limits-frontends instances.
11581158 h := fnv .New32 ()
@@ -1165,11 +1165,11 @@ func (d *Distributor) exceedsLimits(ctx context.Context, tenantID string, stream
11651165 for _ , stream := range streams {
11661166 // Add the stream hash to FNV-1.
11671167 buf := make ([]byte , binary .MaxVarintLen64 )
1168- binary .PutUvarint (buf , stream .Hash )
1168+ binary .PutUvarint (buf , stream .HashKeyNoShard )
11691169 _ , _ = h .Write (buf )
11701170 // Add the stream hash to the request. This is sent to limits-frontend.
11711171 streamHashes = append (streamHashes , & logproto.StreamMetadata {
1172- StreamHash : stream .Hash ,
1172+ StreamHash : stream .HashKeyNoShard ,
11731173 })
11741174 }
11751175
0 commit comments