Lines Matching refs:bom

84 	bom := om.boms[broker]
85 if bom == nil {
86 bom = om.newBrokerOffsetManager(broker)
87 om.boms[broker] = bom
90 bom.refs++
92 return bom
95 func (om *offsetManager) unrefBrokerOffsetManager(bom *brokerOffsetManager) {
99 bom.refs--
101 if bom.refs == 0 {
102 close(bom.updateSubscriptions)
103 if om.boms[bom.broker] == bom {
104 delete(om.boms, bom.broker)
109 func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
113 delete(om.boms, bom.broker)
393 bom := &brokerOffsetManager{
401 go withRecover(bom.mainLoop)
403 return bom
406 func (bom *brokerOffsetManager) mainLoop() {
409 case <-bom.timer.C:
410 if len(bom.subscriptions) > 0 {
411 bom.flushToBroker()
413 case s, ok := <-bom.updateSubscriptions:
415 bom.timer.Stop()
418 if _, ok := bom.subscriptions[s]; ok {
419 delete(bom.subscriptions, s)
421 bom.subscriptions[s] = none{}
427 func (bom *brokerOffsetManager) flushToBroker() {
428 request := bom.constructRequest()
433 response, err := bom.broker.CommitOffset(request)
436 bom.abort(err)
440 for s := range bom.subscriptions {
450 delete(bom.subscriptions, s)
456 delete(bom.subscriptions, s)
467 delete(bom.subscriptions, s)
471 delete(bom.subscriptions, s)
477 func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
480 ConsumerGroup: bom.parent.group,
483 for s := range bom.subscriptions {
498 func (bom *brokerOffsetManager) abort(err error) {
499 _ = bom.broker.Close() // we don't care about the error this might return, we already have one
500 bom.parent.abandonBroker(bom)
502 for pom := range bom.subscriptions {
507 for s := range bom.updateSubscriptions {
508 if _, ok := bom.subscriptions[s]; !ok {
514 bom.subscriptions = make(map[*partitionOffsetManager]none)