 7f8e3192cd
			
		
	
	
		7f8e3192cd
		
			
		
	
	
	
	
		
			
			* Allow common redis and leveldb connections Prevents multiple reopening of redis and leveldb connections to the same place by sharing connections. Further allows for more configurable redis connection type using the redisURI and a leveldbURI scheme. Signed-off-by: Andrew Thornton <art27@cantab.net> * add unit-test Signed-off-by: Andrew Thornton <art27@cantab.net> * as per @lunny Signed-off-by: Andrew Thornton <art27@cantab.net> * add test Signed-off-by: Andrew Thornton <art27@cantab.net> * Update modules/cache/cache_redis.go * Update modules/queue/queue_disk.go * Update modules/cache/cache_redis.go * Update modules/cache/cache_redis.go * Update modules/queue/unique_queue_disk.go * Update modules/queue/queue_disk.go * Update modules/queue/unique_queue_disk.go * Update modules/session/redis.go Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: Lauris BH <lauris@nix.lv>
		
			
				
	
	
		
			117 lines
		
	
	
	
		
			2.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			117 lines
		
	
	
	
		
			2.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2019 The Gitea Authors. All rights reserved.
 | |
| // Use of this source code is governed by a MIT-style
 | |
| // license that can be found in the LICENSE file.
 | |
| 
 | |
| package queue
 | |
| 
 | |
| import (
 | |
| 	"code.gitea.io/gitea/modules/nosql"
 | |
| 
 | |
| 	"gitea.com/lunny/levelqueue"
 | |
| )
 | |
| 
 | |
| // LevelQueueType is the type for level queue
 | |
| const LevelQueueType Type = "level"
 | |
| 
 | |
| // LevelQueueConfiguration is the configuration for a LevelQueue
 | |
| type LevelQueueConfiguration struct {
 | |
| 	ByteFIFOQueueConfiguration
 | |
| 	DataDir          string
 | |
| 	ConnectionString string
 | |
| 	QueueName        string
 | |
| }
 | |
| 
 | |
| // LevelQueue implements a disk library queue
 | |
| type LevelQueue struct {
 | |
| 	*ByteFIFOQueue
 | |
| }
 | |
| 
 | |
| // NewLevelQueue creates a ledis local queue
 | |
| func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
 | |
| 	configInterface, err := toConfig(LevelQueueConfiguration{}, cfg)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	config := configInterface.(LevelQueueConfiguration)
 | |
| 
 | |
| 	if len(config.ConnectionString) == 0 {
 | |
| 		config.ConnectionString = config.DataDir
 | |
| 	}
 | |
| 
 | |
| 	byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	queue := &LevelQueue{
 | |
| 		ByteFIFOQueue: byteFIFOQueue,
 | |
| 	}
 | |
| 	queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
 | |
| 	return queue, nil
 | |
| }
 | |
| 
 | |
| var _ (ByteFIFO) = &LevelQueueByteFIFO{}
 | |
| 
 | |
| // LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
 | |
| type LevelQueueByteFIFO struct {
 | |
| 	internal   *levelqueue.Queue
 | |
| 	connection string
 | |
| }
 | |
| 
 | |
| // NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
 | |
| func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error) {
 | |
| 	db, err := nosql.GetManager().GetLevelDB(connection)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	internal, err := levelqueue.NewQueue(db, []byte(prefix), false)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &LevelQueueByteFIFO{
 | |
| 		connection: connection,
 | |
| 		internal:   internal,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // PushFunc will push data into the fifo
 | |
| func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
 | |
| 	if fn != nil {
 | |
| 		if err := fn(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return fifo.internal.LPush(data)
 | |
| }
 | |
| 
 | |
| // Pop pops data from the start of the fifo
 | |
| func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
 | |
| 	data, err := fifo.internal.RPop()
 | |
| 	if err != nil && err != levelqueue.ErrNotFound {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return data, nil
 | |
| }
 | |
| 
 | |
| // Close this fifo
 | |
| func (fifo *LevelQueueByteFIFO) Close() error {
 | |
| 	err := fifo.internal.Close()
 | |
| 	_ = nosql.GetManager().CloseLevelDB(fifo.connection)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Len returns the length of the fifo
 | |
| func (fifo *LevelQueueByteFIFO) Len() int64 {
 | |
| 	return fifo.internal.Len()
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	queuesMap[LevelQueueType] = NewLevelQueue
 | |
| }
 |