// Copyright 2015 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package hbasekv import ( "fmt" "math/rand" "net/url" "path/filepath" "strings" "sync" "time" "github.com/juju/errors" "github.com/ngaut/log" "github.com/pingcap/go-hbase" "github.com/pingcap/go-themis" "github.com/pingcap/go-themis/oracle" "github.com/pingcap/go-themis/oracle/oracles" "github.com/pingcap/tidb/kv" ) const ( // hbaseColFamily is the hbase column family name. hbaseColFamily = "f" // hbaseQualifier is the hbase column name. hbaseQualifier = "q" // hbaseFmlAndQual is a shortcut. hbaseFmlAndQual = hbaseColFamily + ":" + hbaseQualifier // fix length conn pool hbaseConnPoolSize = 10 ) var ( hbaseColFamilyBytes = []byte(hbaseColFamily) hbaseQualifierBytes = []byte(hbaseQualifier) ) var ( _ kv.Storage = (*hbaseStore)(nil) ) var ( // ErrInvalidDSN is returned when store dsn is invalid. ErrInvalidDSN = errors.New("invalid dsn") ) type storeCache struct { mu sync.Mutex cache map[string]*hbaseStore } var mc storeCache func init() { mc.cache = make(map[string]*hbaseStore) rand.Seed(time.Now().UnixNano()) } type hbaseStore struct { mu sync.Mutex uuid string storeName string oracle oracle.Oracle conns []hbase.HBaseClient } func (s *hbaseStore) getHBaseClient() hbase.HBaseClient { // return hbase connection randomly return s.conns[rand.Intn(hbaseConnPoolSize)] } func (s *hbaseStore) Begin() (kv.Transaction, error) { s.mu.Lock() defer s.mu.Unlock() hbaseCli := s.getHBaseClient() t, err := themis.NewTxn(hbaseCli, s.oracle) if err != nil { return nil, errors.Trace(err) } txn := newHbaseTxn(t, s.storeName) return txn, nil } func (s *hbaseStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) { hbaseCli := s.getHBaseClient() t, err := themis.NewTxn(hbaseCli, s.oracle) if err != nil { return nil, errors.Trace(err) } return newHbaseSnapshot(t, s.storeName), nil } func (s *hbaseStore) Close() error { mc.mu.Lock() defer mc.mu.Unlock() delete(mc.cache, s.uuid) var err error for _, conn := range s.conns { err = conn.Close() if err != nil { log.Error(err) } } // return last error return err } func (s *hbaseStore) UUID() string { return s.uuid } func (s *hbaseStore) CurrentVersion() (kv.Version, error) { hbaseCli := s.getHBaseClient() t, err := themis.NewTxn(hbaseCli, s.oracle) if err != nil { return kv.Version{Ver: 0}, errors.Trace(err) } defer t.Release() return kv.Version{Ver: t.GetStartTS()}, nil } // Driver implements engine Driver. type Driver struct { } const ( tsoTypeLocal = "local" tsoTypeZK = "zk" tsoZKPath = "/zk/tso" ) // Open opens or creates an HBase storage with given path. // // The format of path should be 'hbase://zk1,zk2,zk3/table[?tso=local|zk]'. // If tso is not provided, it will use a local oracle instead. (for test only) func (d Driver) Open(path string) (kv.Storage, error) { mc.mu.Lock() defer mc.mu.Unlock() zks, tso, tableName, err := parsePath(path) if err != nil { return nil, errors.Trace(err) } if tso != tsoTypeLocal && tso != tsoTypeZK { return nil, errors.Trace(ErrInvalidDSN) } uuid := fmt.Sprintf("hbase-%v-%v", zks, tableName) if tso == tsoTypeLocal { log.Warnf("hbase: store(%s) is using local oracle(for test only)", uuid) } if store, ok := mc.cache[uuid]; ok { return store, nil } // create buffered HBase connections, HBaseClient is goroutine-safe, so // it's OK to redistribute to transactions. conns := make([]hbase.HBaseClient, 0, hbaseConnPoolSize) for i := 0; i < hbaseConnPoolSize; i++ { var c hbase.HBaseClient c, err = hbase.NewClient(strings.Split(zks, ","), "/hbase") if err != nil { return nil, errors.Trace(err) } conns = append(conns, c) } c := conns[0] var b bool b, err = c.TableExists(tableName) if err != nil { return nil, errors.Trace(err) } if !b { // Create new hbase table for store. t := hbase.NewTableDesciptor(tableName) cf := hbase.NewColumnFamilyDescriptor(hbaseColFamily) cf.AddAttr("THEMIS_ENABLE", "true") t.AddColumnDesc(cf) //TODO: specify split? if err := c.CreateTable(t, nil); err != nil { return nil, errors.Trace(err) } } var ora oracle.Oracle switch tso { case tsoTypeLocal: ora = oracles.NewLocalOracle() case tsoTypeZK: ora = oracles.NewRemoteOracle(zks, tsoZKPath) } s := &hbaseStore{ uuid: uuid, storeName: tableName, oracle: ora, conns: conns, } mc.cache[uuid] = s return s, nil } func parsePath(path string) (zks, tso, tableName string, err error) { u, err := url.Parse(path) if err != nil { return "", "", "", errors.Trace(err) } if strings.ToLower(u.Scheme) != "hbase" { return "", "", "", errors.Trace(ErrInvalidDSN) } p, tableName := filepath.Split(u.Path) if p != "/" { return "", "", "", errors.Trace(ErrInvalidDSN) } zks = u.Host tso = u.Query().Get("tso") if tso == "" { tso = tsoTypeLocal } return zks, tso, tableName, nil }