go操作mysql


Go操作MySQL

连接

Go语言中的database/sql包提供了保证SQL或类SQL数据库的泛用接口,并不提供具体的数据库驱动。使用database/sql包时必须注入(至少)一个数据库驱动。

我们常用的数据库基本上都有完整的第三方实现。例如:MySQL驱动

下载依赖

go get -u github.com/go-sql-driver/mysql

使用MySQL驱动

func Open(driverName, dataSourceName string) (*DB, error)

Open打开一个dirverName指定的数据库,dataSourceName指定数据源,一般至少包括数据库文件名和其它连接必要的信息。

import (
    "database/sql"

    _ "github.com/go-sql-driver/mysql"
)

func main() {
   // DSN:Data Source Name
    dsn := "user:password@tcp(127.0.0.1:3306)/dbname"
    db, err := sql.Open("mysql", dsn)
    if err != nil {
        panic(err)
    }
    defer db.Close()  // 注意这行代码要写在上面err判断的下面
}

思考题: 为什么上面代码中的defer db.Close()语句不应该写在if err != nil的前面呢?

确保db不为空,db为空的话,调用close()就会报错

初始化连接

Open函数可能只是验证其参数格式是否正确,实际上并不创建与数据库的连接。如果要检查数据源的名称是否真实有效,应该调用Ping方法。

返回的DB对象可以安全地被多个goroutine并发使用,并且维护其自己的空闲连接池。因此,Open函数应该仅被调用一次,很少需要关闭这个DB对象。

接下来,我们定义一个全局变量db,用来保存数据库连接对象。将上面的示例代码拆分出一个独立的initDB函数,只需要在程序启动时调用一次该函数完成全局变量db的初始化,其他函数中就可以直接使用全局变量db了。(注意下方的注意

// 定义一个全局对象db
var db *sql.DB

// 定义一个初始化数据库的函数
func initDB() (err error) {
    // DSN:Data Source Name
    dsn := "user:password@tcp(127.0.0.1:3306)/sql_test?charset=utf8mb4&parseTime=True"
    // 不会校验账号密码是否正确
    // 注意!!!这里不要使用:=,我们是给全局变量赋值,然后在main函数中使用全局变量db
    db, err = sql.Open("mysql", dsn)
    if err != nil {
        return err
    }
    // 尝试与数据库建立连接(校验dsn是否正确)
    err = db.Ping()
    if err != nil {
        return err
    }
    return nil
}

func main() {
    err := initDB() // 调用输出化数据库的函数
    if err != nil {
        fmt.Printf("init db failed,err:%v\n", err)
        return
    }
}

其中sql.DB是表示连接的数据库对象(结构体实例),它保存了连接数据库相关的所有信息。它内部维护着一个具有零到多个底层连接的连接池,它可以安全地被多个goroutine同时使用。

SetMaxOpenConns

func (db *DB) SetMaxOpenConns(n int)

SetMaxOpenConns设置与数据库建立连接的最大数目。 如果n大于0且小于最大闲置连接数,会将最大闲置连接数减小到匹配最大开启连接数的限制。 如果n<=0,不会限制最大开启连接数,默认为0(无限制)。

SetMaxIdleConns

func (db *DB) SetMaxIdleConns(n int)

SetMaxIdleConns设置连接池中的最大闲置连接数。 如果n大于最大开启连接数,则新的最大闲置连接数会减小到匹配最大开启连接数的限制。 如果n<=0,不会保留闲置连接。

CRUD

建库建表

我们先在MySQL中创建一个名为sql_test的数据库

CREATE DATABASE sql_test;

进入该数据库:

use sql_test;

执行以下命令创建一张用于测试的数据表:

CREATE TABLE `user` (
    `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
    `name` VARCHAR(20) DEFAULT '',
    `age` INT(11) DEFAULT '0',
    PRIMARY KEY(`id`)
)ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

查询

为了方便查询,我们事先定义好一个结构体来存储user表的数据。

type user struct {
    id   int
    age  int
    name string
}

单行查询

单行查询db.QueryRow()执行一次查询,并期望返回最多一行结果(即Row)。QueryRow总是返回非nil的值,直到返回值的Scan方法被调用时,才会返回被延迟的错误。(如:未找到结果)

func (db *DB) QueryRow(query string, args ...interface{}) *Row

具体示例代码:

// 查询单条数据示例
func queryRowDemo() {
    sqlStr := "select id, name, age from user where id=?"
    var u user
    // 非常重要:确保QueryRow之后调用Scan方法,否则持有的数据库链接不会被释放
    err := db.QueryRow(sqlStr, 1).Scan(&u.id, &u.name, &u.age)
    if err != nil {
        fmt.Printf("scan failed, err:%v\n", err)
        return
    }
    fmt.Printf("id:%d name:%s age:%d\n", u.id, u.name, u.age)
}

多行查询

多行查询db.Query()执行一次查询,返回多行结果(即Rows),一般用于执行select命令。参数args表示query中的占位参数。

func (db *DB) Query(query string, args ...interface{}) (*Rows, error)

具体示例代码:

// 查询多条数据示例
func queryMultiRowDemo() {
    sqlStr := "select id, name, age from user where id > ?"
    rows, err := db.Query(sqlStr, 0)
    if err != nil {
        fmt.Printf("query failed, err:%v\n", err)
        return
    }
    // 非常重要:关闭rows释放持有的数据库链接
    defer rows.Close()

    // 循环读取结果集中的数据
    for rows.Next() {
        var u user
        err := rows.Scan(&u.id, &u.name, &u.age)
        if err != nil {
            fmt.Printf("scan failed, err:%v\n", err)
            return
        }
        fmt.Printf("id:%d name:%s age:%d\n", u.id, u.name, u.age)
    }
}

插入数据

插入、更新和删除操作都使用Exec方法。

func (db *DB) Exec(query string, args ...interface{}) (Result, error)

Exec执行一次命令(包括查询、删除、更新、插入等),返回的Result是对已执行的SQL命令的总结。参数args表示query中的占位参数。

具体插入数据示例代码如下:

// 插入数据
func insertRowDemo() {
    sqlStr := "insert into user(name, age) values (?,?)"
    ret, err := db.Exec(sqlStr, "王五", 38)
    if err != nil {
        fmt.Printf("insert failed, err:%v\n", err)
        return
    }
    theID, err := ret.LastInsertId() // 新插入数据的id
    if err != nil {
        fmt.Printf("get lastinsert ID failed, err:%v\n", err)
        return
    }
    fmt.Printf("insert success, the id is %d.\n", theID)
}

更新数据

具体更新数据示例代码如下:

// 更新数据
func updateRowDemo() {
    sqlStr := "update user set age=? where id = ?"
    ret, err := db.Exec(sqlStr, 39, 3)
    if err != nil {
        fmt.Printf("update failed, err:%v\n", err)
        return
    }
    n, err := ret.RowsAffected() // 操作影响的行数
    if err != nil {
        fmt.Printf("get RowsAffected failed, err:%v\n", err)
        return
    }
    fmt.Printf("update success, affected rows:%d\n", n)
}

删除数据

具体删除数据的示例代码如下:

// 删除数据
func deleteRowDemo() {
    sqlStr := "delete from user where id = ?"
    ret, err := db.Exec(sqlStr, 3)
    if err != nil {
        fmt.Printf("delete failed, err:%v\n", err)
        return
    }
    n, err := ret.RowsAffected() // 操作影响的行数
    if err != nil {
        fmt.Printf("get RowsAffected failed, err:%v\n", err)
        return
    }
    fmt.Printf("delete success, affected rows:%d\n", n)
}

MySQL预处理

什么是预处理?

普通SQL语句执行过程:

  1. 客户端对SQL语句进行占位符替换得到完整的SQL语句。
  2. 客户端发送完整SQL语句到MySQL服务端
  3. MySQL服务端执行完整的SQL语句并将结果返回给客户端。

预处理执行过程:

  1. 把SQL语句分成两部分,命令部分与数据部分。
  2. 先把命令部分发送给MySQL服务端,MySQL服务端进行SQL预处理。
  3. 然后把数据部分发送给MySQL服务端,MySQL服务端对SQL语句进行占位符替换。
  4. MySQL服务端执行完整的SQL语句并将结果返回给客户端。

为什么要预处理?

  1. 优化MySQL服务器重复执行SQL的方法,可以提升服务器性能,提前让服务器编译,一次编译多次执行,节省后续编译的成本。
  2. 避免SQL注入问题。

Go实现MySQL预处理

database/sql中使用下面的Prepare方法来实现预处理操作。

func (db *DB) Prepare(query string) (*Stmt, error)

Prepare方法会先将sql语句发送给MySQL服务端,返回一个准备好的状态用于之后的查询和命令。返回值可以同时执行多个查询和命令。

查询操作的预处理示例代码如下:

// 预处理查询示例
func prepareQueryDemo() {
    sqlStr := "select id, name, age from user where id > ?"
    stmt, err := db.Prepare(sqlStr)
    if err != nil {
        fmt.Printf("prepare failed, err:%v\n", err)
        return
    }
    defer stmt.Close()
    rows, err := stmt.Query(0)
    if err != nil {
        fmt.Printf("query failed, err:%v\n", err)
        return
    }
    defer rows.Close()
    // 循环读取结果集中的数据
    for rows.Next() {
        var u user
        err := rows.Scan(&u.id, &u.name, &u.age)
        if err != nil {
            fmt.Printf("scan failed, err:%v\n", err)
            return
        }
        fmt.Printf("id:%d name:%s age:%d\n", u.id, u.name, u.age)
    }
}

插入、更新和删除操作的预处理十分类似,这里以插入操作的预处理为例:

// 预处理插入示例
func prepareInsertDemo() {
    sqlStr := "insert into user(name, age) values (?,?)"
    stmt, err := db.Prepare(sqlStr)
    if err != nil {
        fmt.Printf("prepare failed, err:%v\n", err)
        return
    }
    defer stmt.Close()
    _, err = stmt.Exec("小王子", 18)
    if err != nil {
        fmt.Printf("insert failed, err:%v\n", err)
        return
    }
    _, err = stmt.Exec("沙河娜扎", 18)
    if err != nil {
        fmt.Printf("insert failed, err:%v\n", err)
        return
    }
    fmt.Println("insert success.")
}

SQL注入问题

我们任何时候都不应该自己拼接SQL语句!

这里我们演示一个自行拼接SQL语句的示例,编写一个根据name字段查询user表的函数如下:

// sql注入示例
func sqlInjectDemo(name string) {
    sqlStr := fmt.Sprintf("select id, name, age from user where name='%s'", name)
    fmt.Printf("SQL:%s\n", sqlStr)
    var u user
    err := db.QueryRow(sqlStr).Scan(&u.id, &u.name, &u.age)
    if err != nil {
        fmt.Printf("exec failed, err:%v\n", err)
        return
    }
    fmt.Printf("user:%#v\n", u)
}

此时以下输入字符串都可以引发SQL注入问题:

sqlInjectDemo("xxx' or 1=1#")
sqlInjectDemo("xxx' union select * from user #")
sqlInjectDemo("xxx' and (select count(*) from user) <10 #")

补充:不同的数据库中,SQL语句使用的占位符语法不尽相同。

数据库 占位符语法
MySQL
PostgreSQL $1, $2
SQLite ?$1
Oracle :name

Go实现MySQL事务

什么是事务?

事务:一个最小的不可再分的工作单元;通常一个事务对应一个完整的业务(例如银行账户转账业务,该业务就是一个最小的工作单元),同时这个完整的业务需要执行多次的DML(insert、update、delete)语句共同联合完成。A转账给B,这里面就需要执行两次update操作。

在MySQL中只有使用了Innodb数据库引擎的数据库或表才支持事务。事务处理可以用来维护数据库的完整性,保证成批的SQL语句要么全部执行,要么全部不执行。

事务的ACID

通常事务必须满足4个条件(ACID):原子性(Atomicity,或称不可分割性)、一致性(Consistency)、隔离性(Isolation,又称独立性)、持久性(Durability)。

条件 解释
原子性 一个事务(transaction)中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节。事务在执行过程中发生错误,会被回滚(Rollback)到事务开始前的状态,就像这个事务从来没有执行过一样。
一致性 在事务开始之前和事务结束以后,数据库的完整性没有被破坏。这表示写入的资料必须完全符合所有的预设规则,这包含资料的精确度、串联性以及后续数据库可以自发性地完成预定的工作。
隔离性 数据库允许多个并发事务同时对其数据进行读写和修改的能力,隔离性可以防止多个事务并发执行时由于交叉执行而导致数据的不一致。事务隔离分为不同级别,包括读未提交(Read uncommitted)、读提交(read committed)、可重复读(repeatable read)和串行化(Serializable)。
持久性 事务处理结束后,对数据的修改就是永久的,即便系统故障也不会丢失。

事务相关方法

Go语言中使用以下三个方法实现MySQL中的事务操作。 开始事务

func (db *DB) Begin() (*Tx, error)

提交事务

func (tx *Tx) Commit() error

回滚事务

func (tx *Tx) Rollback() error

事务示例

下面的代码演示了一个简单的事务操作,该事物操作能够确保两次更新操作要么同时成功要么同时失败,不会存在中间状态。

// 事务操作示例
func transactionDemo() {
    tx, err := db.Begin() // 开启事务
    if err != nil {
        if tx != nil {
            tx.Rollback() // 回滚
        }
        fmt.Printf("begin trans failed, err:%v\n", err)
        return
    }
    sqlStr1 := "Update user set age=30 where id=?"
    ret1, err := tx.Exec(sqlStr1, 2)
    if err != nil {
        tx.Rollback() // 回滚
        fmt.Printf("exec sql1 failed, err:%v\n", err)
        return
    }
    affRow1, err := ret1.RowsAffected()
    if err != nil {
        tx.Rollback() // 回滚
        fmt.Printf("exec ret1.RowsAffected() failed, err:%v\n", err)
        return
    }

    sqlStr2 := "Update user set age=40 where id=?"
    ret2, err := tx.Exec(sqlStr2, 3)
    if err != nil {
        tx.Rollback() // 回滚
        fmt.Printf("exec sql2 failed, err:%v\n", err)
        return
    }
    affRow2, err := ret2.RowsAffected()
    if err != nil {
        tx.Rollback() // 回滚
        fmt.Printf("exec ret1.RowsAffected() failed, err:%v\n", err)
        return
    }

    fmt.Println(affRow1, affRow2)
    if affRow1 == 1 && affRow2 == 1 {
        fmt.Println("事务提交啦...")
        tx.Commit() // 提交事务
    } else {
        tx.Rollback()
        fmt.Println("事务回滚啦...")
    }

    fmt.Println("exec trans success!")
}

sqlx库使用指南

在项目中我们通常可能会使用database/sql连接MySQL数据库。本文借助使用sqlx实现批量插入数据的例子,介绍了sqlx中可能被你忽视了的sqlx.InDB.NamedExec方法。

sqlx介绍

在项目中我们通常可能会使用database/sql连接MySQL数据库。sqlx可以认为是Go语言内置database/sql的超集,它在优秀的内置database/sql基础上提供了一组扩展。这些扩展中除了大家常用来查询的Get(dest interface{}, ...) errorSelect(dest interface{}, ...) error外还有很多其他强大的功能。

安装sqlx

go get github.com/jmoiron/sqlx

基本使用

连接数据库

import(
    "fmt"
    "github.com/jmoiron/sqlx"
    _ "github.com/go-sql-drivr/mysql"
)

var db *sqlx.DB

func initDB() (err error) {
    dsn := "root:password@tcp(127.0.0.1:3306)/sql_demo?charset=utf8mb4&parseTime=True"
    // 也可以使用MustConnect连接不成功就panic
    db, err = sqlx.Connect("mysql", dsn)
    if err != nil {
        fmt.Printf("connect DB failed, err:%v\n", err)
        return
    }
    db.SetMaxOpenConns(20)
    db.SetMaxIdleConns(10)
    return
}

查询

查询单行数据示例代码如下:

// 查询单条数据示例
func queryRowDemo() {
    sqlStr := "select id, name, age from user where id=?"
    var u user
    err := db.Get(&u, sqlStr, 1)
    if err != nil {
        fmt.Printf("get failed, err:%v\n", err)
        return
    }
    fmt.Printf("id:%d name:%s age:%d\n", u.ID, u.Name, u.Age)
}

查询多行数据示例代码如下:

// 查询多条数据示例
func queryMultiRowDemo() {
    sqlStr := "select id, name, age from user where id > ?"
    var users []user
    err := db.Select(&users, sqlStr, 0)
    if err != nil {
        fmt.Printf("query failed, err:%v\n", err)
        return
    }
    fmt.Printf("users:%#v\n", users)
}

插入、更新和删除

sqlx中的exec方法与原生sql中的exec使用基本一致:

// 插入数据
func insertRowDemo() {
    sqlStr := "insert into user(name, age) values (?,?)"
    ret, err := db.Exec(sqlStr, "沙河小王子", 19)
    if err != nil {
        fmt.Printf("insert failed, err:%v\n", err)
        return
    }
    theID, err := ret.LastInsertId() // 新插入数据的id
    if err != nil {
        fmt.Printf("get lastinsert ID failed, err:%v\n", err)
        return
    }
    fmt.Printf("insert success, the id is %d.\n", theID)
}

// 更新数据
func updateRowDemo() {
    sqlStr := "update user set age=? where id = ?"
    ret, err := db.Exec(sqlStr, 39, 6)
    if err != nil {
        fmt.Printf("update failed, err:%v\n", err)
        return
    }
    n, err := ret.RowsAffected() // 操作影响的行数
    if err != nil {
        fmt.Printf("get RowsAffected failed, err:%v\n", err)
        return
    }
    fmt.Printf("update success, affected rows:%d\n", n)
}

// 删除数据
func deleteRowDemo() {
    sqlStr := "delete from user where id = ?"
    ret, err := db.Exec(sqlStr, 6)
    if err != nil {
        fmt.Printf("delete failed, err:%v\n", err)
        return
    }
    n, err := ret.RowsAffected() // 操作影响的行数
    if err != nil {
        fmt.Printf("get RowsAffected failed, err:%v\n", err)
        return
    }
    fmt.Printf("delete success, affected rows:%d\n", n)
}

NamedExec

DB.NamedExec方法用来绑定SQL语句与结构体或map中的同名字段。

func insertUserDemo()(err error){
    sqlStr := "INSERT INTO user (name,age) VALUES (:name,:age)"
    _, err = db.NamedExec(sqlStr,
        map[string]interface{}{
            "name": "七米",
            "age": 28,
        })
    return
}

NamedQuery

DB.NamedExec同理,这里是支持查询。

func namedQuery(){
    sqlStr := "SELECT * FROM user WHERE name=:name"
    // 使用map做命名查询
    rows, err := db.NamedQuery(sqlStr, map[string]interface{}{"name": "七米"})
    if err != nil {
        fmt.Printf("db.NamedQuery failed, err:%v\n", err)
        return
    }
    defer rows.Close()
    for rows.Next(){
        var u user
        err := rows.StructScan(&u)
        if err != nil {
            fmt.Printf("scan failed, err:%v\n", err)
            continue
        }
        fmt.Printf("user:%#v\n", u)
    }
    
    fmt.Println("--------------------------------------------")

    u := user{
        Name: "七米",
    }
    // 使用结构体命名查询,根据结构体字段的 db tag进行映射
    rows, err = db.NamedQuery(sqlStr, u)
    if err != nil {
        fmt.Printf("db.NamedQuery failed, err:%v\n", err)
        return
    }
    defer rows.Close()
    for rows.Next(){
        var u user
        err := rows.StructScan(&u)
        if err != nil {
            fmt.Printf("scan failed, err:%v\n", err)
            continue
        }
        fmt.Printf("user:%#v\n", u)
    }
}

事务操作

对于事务操作,我们可以使用sqlx中提供的db.Beginx()tx.Exec()方法。示例代码如下:

func transactionDemo2()(err error) {
    tx, err := db.Beginx() // 开启事务
    if err != nil {
        fmt.Printf("begin trans failed, err:%v\n", err)
        return err
    }
    defer func() {
        if p := recover(); p != nil {
            tx.Rollback()
            panic(p) // re-throw panic after Rollback
        } else if err != nil {
            fmt.Println("rollback")
            tx.Rollback() // err is non-nil; don't change it
        } else {
            err = tx.Commit() // err is nil; if Commit returns error update err
            fmt.Println("commit")
        }
    }()

    sqlStr1 := "Update user set age=20 where id=?"

    rs, err := tx.Exec(sqlStr1, 1)
    if err!= nil{
        return err
    }
    n, err := rs.RowsAffected()
    if err != nil {
        return err
    }
    if n != 1 {
        return errors.New("exec sqlStr1 failed")
    }
    sqlStr2 := "Update user set age=50 where i=?"
    rs, err = tx.Exec(sqlStr2, 5)
    if err!=nil{
        return err
    }
    n, err = rs.RowsAffected()
    if err != nil {
        return err
    }
    if n != 1 {
        return errors.New("exec sqlStr1 failed")
    }
    return err
}

sqlx.In

sqlx.Insqlx提供的一个非常方便的函数。

sqlx.In的批量插入示例

表结构

为了方便演示插入数据操作,这里创建一个user表,表结构如下:

CREATE TABLE `user` (
    `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
    `name` VARCHAR(20) DEFAULT '',
    `age` INT(11) DEFAULT '0',
    PRIMARY KEY(`id`)
)ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

结构体

定义一个user结构体,字段通过tag与数据库中user表的列一致。

type User struct {
    Name string `db:"name"`
    Age  int    `db:"age"`
}

bindvars(绑定变量)

查询占位符?在内部称为bindvars(查询占位符),它非常重要。你应该始终使用它们向数据库发送值,因为它们可以防止SQL注入攻击。database/sql不尝试对查询文本进行任何验证;它与编码的参数一起按原样发送到服务器。除非驱动程序实现一个特殊的接口,否则在执行之前,查询是在服务器上准备的。因此bindvars是特定于数据库的:

  • MySQL中使用?
  • PostgreSQL使用枚举的$1$2等bindvar语法
  • SQLite中?$1的语法都支持
  • Oracle中使用:name的语法

bindvars的一个常见误解是,它们用来在sql语句中插入值。它们其实仅用于参数化,不允许更改SQL语句的结构。例如,使用bindvars尝试参数化列或表名将不起作用:

// ?不能用来插入表名(做SQL语句中表名的占位符)
db.Query("SELECT * FROM ?", "mytable")
 
// ?也不能用来插入列名(做SQL语句中列名的占位符)
db.Query("SELECT ?, ? FROM people", "name", "location")

自己拼接语句实现批量插入

比较笨,但是很好理解。就是有多少个User就拼接多少个(?, ?)

// BatchInsertUsers 自行构造批量插入的语句
func BatchInsertUsers(users []*User) error {
    // 存放 (?, ?) 的slice
    valueStrings := make([]string, 0, len(users))
    // 存放values的slice
    valueArgs := make([]interface{}, 0, len(users) * 2)
    // 遍历users准备相关数据
    for _, u := range users {
        // 此处占位符要与插入值的个数对应
        valueStrings = append(valueStrings, "(?, ?)")
        valueArgs = append(valueArgs, u.Name)
        valueArgs = append(valueArgs, u.Age)
    }
    // 自行拼接要执行的具体语句
    stmt := fmt.Sprintf("INSERT INTO user (name, age) VALUES %s",
        strings.Join(valueStrings, ","))
    _, err := DB.Exec(stmt, valueArgs...)
    return err
}

使用sqlx.In实现批量插入

前提是需要我们的结构体实现driver.Valuer接口:

func (u User) Value() (driver.Value, error) {
    return []interface{}{u.Name, u.Age}, nil
}

使用sqlx.In实现批量插入代码如下:

// BatchInsertUsers2 使用sqlx.In帮我们拼接语句和参数, 注意传入的参数是[]interface{}
func BatchInsertUsers2(users []interface{}) error {
    query, args, _ := sqlx.In(
        "INSERT INTO user (name, age) VALUES (?), (?), (?)",
        users..., // 如果arg实现了 driver.Valuer, sqlx.In 会通过调用 Value()来展开它
    )
    fmt.Println(query) // 查看生成的querystring
    fmt.Println(args)  // 查看生成的args
    _, err := DB.Exec(query, args...)
    return err
}

使用NamedExec实现批量插入

注意 :该功能需1.3.1版本以上,并且1.3.1版本目前还有点问题,sql语句最后不能有空格和;,详见issues/690

使用NamedExec实现批量插入的代码如下:

// BatchInsertUsers3 使用NamedExec实现批量插入
func BatchInsertUsers3(users []*User) error {
    _, err := DB.NamedExec("INSERT INTO user (name, age) VALUES (:name, :age)", users)
    return err
}

把上面三种方法综合起来试一下:

func main() {
    err := initDB()
    if err != nil {
        panic(err)
    }
    defer DB.Close()
    u1 := User{Name: "七米", Age: 18}
    u2 := User{Name: "q1mi", Age: 28}
    u3 := User{Name: "小王子", Age: 38}

    // 方法1
    users := []*User{&u1, &u2, &u3}
    err = BatchInsertUsers(users)
    if err != nil {
        fmt.Printf("BatchInsertUsers failed, err:%v\n", err)
    }

    // 方法2
    users2 := []interface{}{u1, u2, u3}
    err = BatchInsertUsers2(users2)
    if err != nil {
        fmt.Printf("BatchInsertUsers2 failed, err:%v\n", err)
    }

    // 方法3
    users3 := []*User{&u1, &u2, &u3}
    err = BatchInsertUsers3(users3)
    if err != nil {
        fmt.Printf("BatchInsertUsers3 failed, err:%v\n", err)
    }
}

sqlx.In的查询示例

关于sqlx.In这里再补充一个用法,在sqlx查询语句中实现In查询和FIND_IN_SET函数。即实现SELECT * FROM user WHERE id in (3, 2, 1);SELECT * FROM user WHERE id in (3, 2, 1) ORDER BY FIND_IN_SET(id, '3,2,1');

in查询

查询id在给定id集合中的数据。

// QueryByIDs 根据给定ID查询
func QueryByIDs(ids []int)(users []User, err error){
    // 动态填充id
    query, args, err := sqlx.In("SELECT name, age FROM user WHERE id IN (?)", ids)
    if err != nil {
        return
    }
    // sqlx.In 返回带 `?` bindvar的查询语句, 我们使用Rebind()重新绑定它
    query = DB.Rebind(query)

    err = DB.Select(&users, query, args...)
    return
}

in查询和FIND_IN_SET函数

查询id在给定id集合的数据并维持给定id集合的顺序。

// QueryAndOrderByIDs 按照指定id查询并维护顺序
func QueryAndOrderByIDs(ids []int)(users []User, err error){
    // 动态填充id
    strIDs := make([]string, 0, len(ids))
    for _, id := range ids {
        strIDs = append(strIDs, fmt.Sprintf("%d", id))
    }
    query, args, err := sqlx.In("SELECT name, age FROM user WHERE id IN (?) ORDER BY FIND_IN_SET(id, ?)", ids, strings.Join(strIDs, ","))
    if err != nil {
        return
    }

    // sqlx.In 返回带 `?` bindvar的查询语句, 我们使用Rebind()重新绑定它
    query = DB.Rebind(query)

    err = DB.Select(&users, query, args...)
    return
}

当然,在这个例子里面你也可以先使用IN查询,然后通过代码按给定的ids对查询结果进行排序。

Go语言操作Redis

在项目开发中redis的使用也比较频繁,本文介绍了Go语言中go-redis库的基本使用。

Redis

在项目开发中redis的使用也比较频繁,本文介绍了Go语言中go-redis库的基本使用。

Redis介绍

Redis是一个开源的内存数据库,Redis提供了多种不同类型的数据结构,很多业务场景下的问题都可以很自然地映射到这些数据结构上。除此之外,通过复制、持久化和客户端分片等特性,我们可以很方便地将Redis扩展成一个能够包含数百GB数据、每秒处理上百万次请求的系统。

Redis支持的数据结构

Redis支持诸如字符串(string)、哈希(hashe)、列表(list)、集合(set)、带范围查询的排序集合(sorted set)、bitmap、hyperloglog、带半径查询的地理空间索引(geospatial index)和流(stream)等数据结构。

Redis应用场景

  • 缓存系统,减轻主数据库(MySQL)的压力。
  • 计数场景,比如微博、抖音中的关注数和粉丝数。
  • 热门排行榜,需要排序的场景特别适合使用ZSET。
  • 利用 LIST 可以实现队列的功能。
  • 利用 HyperLogLog 统计UV、PV等数据。
  • 使用 geospatial index 进行地理位置相关查询。

go-redis库

安装

Go 社区中目前有很多成熟的 redis client 库,比如[https://github.com/gomodule/redigohttps://github.com/go-redis/redis,读者可以自行选择适合自己的库。本书使用 go-redis 这个库来操作 Redis 数据库。

使用以下命令下安装 go-redis 库。

go get github.com/go-redis/redis/v8

连接

普通连接模式

go-redis 库中使用 redis.NewClient 函数连接 Redis 服务器。

这里rdb记得要变成全局变量

var rdb *redis.Client

//初始化连接
func initClient() (err error){
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "", // 密码
        DB:       0,  // 数据库
        PoolSize: 20, // 连接池大小
    })
    
    _,err = rdb.Ping().Result()
    return err
}

除此之外,还可以使用 redis.ParseURL 函数从表示数据源的字符串中解析得到 Redis 服务器的配置信息。

opt, err := redis.ParseURL("redis://<user>:<pass>@localhost:6379/<db>")
if err != nil {
    panic(err)
}

rdb := redis.NewClient(opt)

TLS连接模式

如果使用的是 TLS 连接方式,则需要使用 tls.Config 配置。

rdb := redis.NewClient(&redis.Options{
    TLSConfig: &tls.Config{
        MinVersion: tls.VersionTLS12,
        // Certificates: []tls.Certificate{cert},
    // ServerName: "your.domain.com",
    },
})

Redis Sentinel模式

使用下面的命令连接到由 Redis Sentinel 管理的 Redis 服务器。

rdb := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:    "master-name",
    SentinelAddrs: []string{":9126", ":9127", ":9128"},
})

Redis Cluster模式

使用下面的命令连接到 Redis Cluster,go-redis 支持按延迟或随机路由命令。

rdb := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},

    // 若要根据延迟或随机路由命令,请启用以下命令之一
    // RouteByLatency: true,
    // RouteRandomly: true,
})

基本使用

执行命令

下面的示例代码演示了 go-redis 库的基本使用。

// doCommand go-redis基本使用示例
func doCommand() {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    // 执行命令获取结果
    val, err := rdb.Get(ctx, "key").Result()
    fmt.Println(val, err)

    // 先获取到命令对象
    cmder := rdb.Get(ctx, "key")
    fmt.Println(cmder.Val()) // 获取值
    fmt.Println(cmder.Err()) // 获取错误

    // 直接执行命令获取错误
    err = rdb.Set(ctx, "key", 10, time.Hour).Err()

    // 直接执行命令获取值
    value := rdb.Get(ctx, "key").Val()
    fmt.Println(value)
}

执行任意命令

go-redis 还提供了一个执行任意命令或自定义命令的 Do 方法,特别是一些 go-redis 库暂时不支持的命令都可以使用该方法执行。具体使用方法如下。

// doDemo rdb.Do 方法使用示例
func doDemo() {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    // 直接执行命令获取错误
    err := rdb.Do(ctx, "set", "key", 10, "EX", 3600).Err()
    fmt.Println(err)

    // 执行命令获取结果
    val, err := rdb.Do(ctx, "get", "key").Result()
    fmt.Println(val, err)
}

redis.Nil

go-redis 库提供了一个 redis.Nil 错误来表示 Key 不存在的错误。因此在使用 go-redis 时需要注意对返回错误的判断。在某些场景下我们应该区别处理 redis.Nil 和其他不为 nil 的错误。

// getValueFromRedis redis.Nil判断
func getValueFromRedis(key, defaultValue string) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    val, err := rdb.Get(ctx, key).Result()
    if err != nil {
        // 如果返回的错误是key不存在
        if errors.Is(err, redis.Nil) {
            return defaultValue, nil
        }
        // 出其他错了
        return "", err
    }
    return val, nil
}

其他示例

zset示例

下面的示例代码演示了如何使用 go-redis 库操作 zset。

// zsetDemo 操作zset示例
func zsetDemo() {
    // key
    zsetKey := "language_rank"
    // value
    languages := []*redis.Z{
        {Score: 90.0, Member: "Golang"},
        {Score: 98.0, Member: "Java"},
        {Score: 95.0, Member: "Python"},
        {Score: 97.0, Member: "JavaScript"},
        {Score: 99.0, Member: "C/C++"},
    }
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    // ZADD
    err := rdb.ZAdd(ctx, zsetKey, languages...).Err()
    if err != nil {
        fmt.Printf("zadd failed, err:%v\n", err)
        return
    }
    fmt.Println("zadd success")

    // 把Golang的分数加10
    newScore, err := rdb.ZIncrBy(ctx, zsetKey, 10.0, "Golang").Result()
    if err != nil {
        fmt.Printf("zincrby failed, err:%v\n", err)
        return
    }
    fmt.Printf("Golang's score is %f now.\n", newScore)

    // 取分数最高的3个
    ret := rdb.ZRevRangeWithScores(ctx, zsetKey, 0, 2).Val()
    for _, z := range ret {
        fmt.Println(z.Member, z.Score) 
    }

    // 取95~100分的
    op := &redis.ZRangeBy{
        Min: "95",
        Max: "100",
    }
    ret, err = rdb.ZRangeByScoreWithScores(ctx, zsetKey, op).Result()
    if err != nil {
        fmt.Printf("zrangebyscore failed, err:%v\n", err)
        return
    }
    for _, z := range ret {
        fmt.Println(z.Member, z.Score)
    }
}

执行上面的函数将得到如下输出结果。

zadd success
Golang's score is 100.000000 now.
Golang 100
C/C++ 99
Java 98
Python 95
JavaScript 97
Java 98
C/C++ 99
Golang 100

扫描或遍历所有key

你可以使用KEYS prefix:* 命令按前缀获取所有 key。

vals, err := rdb.Keys(ctx, "prefix*").Result()

但是如果需要扫描数百万的 key ,那速度就会比较慢。这种场景下你可以使用Scan 命令来遍历所有符合要求的 key。

// scanKeysDemo1 按前缀查找所有key示例
func scanKeysDemo1() {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    var cursor uint64
    for {
        var keys []string
        var err error
        // 按前缀扫描key
        keys, cursor, err = rdb.Scan(ctx, cursor, "prefix:*", 0).Result()
        if err != nil {
            panic(err)
        }

        for _, key := range keys {
            fmt.Println("key", key)
        }

        if cursor == 0 { // no more keys
            break
        }
    }
}

Go-redis 允许将上面的代码简化为如下示例。

// scanKeysDemo2 按前缀扫描key示例
func scanKeysDemo2() {
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()
    // 按前缀扫描key
    iter := rdb.Scan(ctx, 0, "prefix:*", 0).Iterator()
    for iter.Next(ctx) {
        fmt.Println("keys", iter.Val())
    }
    if err := iter.Err(); err != nil {
        panic(err)
    }
}

例如,我们可以写出一个将所有匹配指定模式的 key 删除的示例。

// delKeysByMatch 按match格式扫描所有key并删除
func delKeysByMatch(match string, timeout time.Duration) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    iter := rdb.Scan(ctx, 0, match, 0).Iterator()
    for iter.Next(ctx) {
        err := rdb.Del(ctx, iter.Val()).Err()
        if err != nil {
            panic(err)
        }
    }
    if err := iter.Err(); err != nil {
        panic(err)
    }
}

此外,对于 Redis 中的 set、hash、zset 数据类型,go-redis 也支持类似的遍历方法。

iter := rdb.SScan(ctx, "set-key", 0, "prefix:*", 0).Iterator()
iter := rdb.HScan(ctx, "hash-key", 0, "prefix:*", 0).Iterator()
iter := rdb.ZScan(ctx, "sorted-hash-key", 0, "prefix:*", 0).Iterator(

Pipeline

Redis Pipeline 允许通过使用单个 client-server-client 往返执行多个命令来提高性能。区别于一个接一个地执行100个命令,你可以将这些命令放入 pipeline 中,然后使用1次读写操作像执行单个命令一样执行它们。这样做的好处是节省了执行命令的网络往返时间(RTT)。

y在下面的示例代码中演示了使用 pipeline 通过一个 write + read 操作来执行多个命令。

pipe := rdb.Pipeline()

incr := pipe.Incr(ctx, "pipeline_counter")
pipe.Expire(ctx, "pipeline_counter", time.Hour)

cmds, err := pipe.Exec(ctx)
if err != nil {
    panic(err)
}

// 在执行pipe.Exec之后才能获取到结果
fmt.Println(incr.Val())

上面的代码相当于将以下两个命令一次发给 Redis Server 端执行,与不使用 Pipeline 相比能减少一次RTT。

INCR pipeline_counter
EXPIRE pipeline_counts 3600

或者,你也可以使用Pipelined 方法,它会在函数退出时调用 Exec。

var incr *redis.IntCmd

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
    incr = pipe.Incr(ctx, "pipelined_counter")
    pipe.Expire(ctx, "pipelined_counter", time.Hour)
    return nil
})
if err != nil {
    panic(err)
}

// 在pipeline执行后获取到结果
fmt.Println(incr.Val())

我们可以遍历 pipeline 命令的返回值依次获取每个命令的结果。下方的示例代码中使用pipiline一次执行了100个 Get 命令,在pipeline 执行后遍历取出100个命令的执行结果。

cmds, err := rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error {
    for i := 0; i < 100; i++ {
        pipe.Get(ctx, fmt.Sprintf("key%d", i))
    }
    return nil
})
if err != nil {
    panic(err)
}

for _, cmd := range cmds {
    fmt.Println(cmd.(*redis.StringCmd).Val())
}

在那些我们需要一次性执行多个命令的场景下,就可以考虑使用 pipeline 来优化。

事务

Redis 是单线程执行命令的,因此单个命令始终是原子的,但是来自不同客户端的两个给定命令可以依次执行,例如在它们之间交替执行。但是,Multi/exec能够确保在multi/exec两个语句之间的命令之间没有其他客户端正在执行命令

在这种场景我们需要使用 TxPipeline 或 TxPipelined 方法将 pipeline 命令使用 MULTIEXEC包裹起来。

// TxPipeline demo
pipe := rdb.TxPipeline()
incr := pipe.Incr(ctx, "tx_pipeline_counter")
pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
_, err := pipe.Exec(ctx)
fmt.Println(incr.Val(), err)

// TxPipelined demo
var incr2 *redis.IntCmd
_, err = rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
    incr2 = pipe.Incr(ctx, "tx_pipeline_counter")
    pipe.Expire(ctx, "tx_pipeline_counter", time.Hour)
    return nil
})
fmt.Println(incr2.Val(), err)

上面代码相当于在一个RTT下执行了下面的redis命令:

MULTI
INCR pipeline_counter
EXPIRE pipeline_counts 3600
EXEC

Watch

我们通常搭配 WATCH命令来执行事务操作。从使用WATCH命令监视某个 key 开始,直到执行EXEC命令的这段时间里,如果有其他用户抢先对被监视的 key 进行了替换、更新、删除等操作,那么当用户尝试执行EXEC的时候,事务将失败并返回一个错误,用户可以根据这个错误选择重试事务或者放弃事务。

Watch方法接收一个函数和一个或多个key作为参数。

Watch(fn func(*Tx) error, keys ...string) error

下面的代码片段演示了 Watch 方法搭配 TxPipelined 的使用示例。

// watchDemo 在key值不变的情况下将其值+1
func watchDemo(ctx context.Context, key string) error {
    return rdb.Watch(ctx, func(tx *redis.Tx) error {
        n, err := tx.Get(ctx, key).Int()
        if err != nil && err != redis.Nil {
            return err
        }
        // 假设操作耗时5秒
        // 5秒内我们通过其他的客户端修改key,当前事务就会失败
        time.Sleep(5 * time.Second)
        _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
            pipe.Set(ctx, key, n+1, time.Hour)
            return nil
        })
        return err
    }, key)
}

将上面的函数执行并打印其返回值,如果我们在程序运行后的5秒内修改了被 watch 的 key 的值,那么该事务操作失败,返回redis: transaction failed错误。

最后我们来看一个 go-redis 官方文档中使用 GETSETWATCH命令实现一个 INCR 命令的完整示例。

const routineCount = 100

increment := func(key string) error {
    txf := func(tx *redis.Tx) error {
        // 获得当前值或零值
        n, err := tx.Get(key).Int()
        if err != nil && err != redis.Nil {
            return err
        }

        // 实际操作(乐观锁定中的本地操作)
        n++

        // 仅在监视的Key保持不变的情况下运行
        _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
            // pipe 处理错误情况
            pipe.Set(key, n, 0)
            return nil
        })
        return err
    }

    for retries := routineCount; retries > 0; retries-- {
        err := rdb.Watch(txf, key)
        if err != redis.TxFailedErr {
            return err
        }
        // 乐观锁丢失
    }
    return errors.New("increment reached maximum number of retries")
}

var wg sync.WaitGroup
wg.Add(routineCount)
for i := 0; i < routineCount; i++ {
    go func() {
        defer wg.Done()

        if err := increment("counter3"); err != nil {
            fmt.Println("increment error:", err)
        }
    }()
}
wg.Wait()

n, err := rdb.Get("counter3").Int()
fmt.Println("ended with", n, err)

在这个示例中使用了 redis.TxFailedErr 来检查事务是否失败。


文章作者:
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 !
  目录