DB-API是一个规范,它定义一个系列必须的对象和数据库存取方式,以便各种各样的底层数据库系统和多种多样的数据库接口程序提供一致的访问接口.

通用步骤:

1.引入模块
2.获取与数据库的连接
3.执行SQL语句和存储过程
4.关闭数据库连接

SQL Server数据库

++ pymssql 官方文档

pip install pymssql 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import pymssql 

server = "187.32.43.13" # 连接服务器地址
user = "root"         # 连接帐号
password = "1234"      # 连接密码

conn = pymssql.connect(server, user, password, "连接默认数据库名称") #获取连接

cursor = conn.cursor() # 获取光标

# 创建表
cursor.execute("""
IF OBJECT_ID('persons', 'U') IS NOT NULL
DROP TABLE persons
CREATE TABLE persons (
id INT NOT NULL,
name VARCHAR(100),
salesrep VARCHAR(100),
PRIMARY KEY(id)
)
""")  

# 插入多行数据
cursor.executemany(
"INSERT INTO persons VALUES (%d, %s, %s)",
[(1, 'John Smith', 'John Doe'),
(2, 'Jane Doe', 'Joe Dog'),
(3, 'Mike T.', 'Sarah H.')])

# 你必须调用 commit() 来保持你数据的提交如果你没有将自动提交设置为true
conn.commit()

# 查询数据
cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')

# 遍历数据(存放到元组中) 方式1
row = cursor.fetchone()
while row:
print("ID=%d, Name=%s" % (row[0], row[1]))
row = cursor.fetchone()

# 遍历数据(存放到元组中) 方式2
for row in cursor:
print('row = %r' % (row,))


# 遍历数据(存放到字典中)
# cursor = conn.cursor(as_dict=True)
#
# cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
# for row in cursor:
# print("ID=%d, Name=%s" % (row['id'], row['name']))
#
# conn.close()

# 关闭连接
conn.close()

# 注:在任何时候,在一个连接下,一次正在执行的数据库操作只会出现一个cursor对象

with 来避免手动关闭cursors和connection连接

1
2
3
4
5
6
7
8
9
10
11
import pymssql 

server = "187.32.43.13" # 连接服务器地址
user = "root"         # 连接帐号
password = "1234"      # 连接密码

with pymssql.connect(server, user, password, "你的连接默认数据库名称") as conn:
with conn.cursor(as_dict=True) as cursor: # 数据存放到字典中
cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
for row in cursor:
print("ID=%d, Name=%s" % (row['id'], row['name']))

调用存储空间

1
2
3
4
5
6
7
8
9
10
11
12
with pymssql.connect(server, user, password, "tempdb") as conn:
with conn.cursor(as_dict=True) as cursor:
cursor.execute("""
CREATE PROCEDURE FindPerson
@name VARCHAR(100)
AS BEGIN
SELECT * FROM persons WHERE name = @name
END
""")
cursor.callproc('FindPerson', ('Jane Doe',))
for row in cursor:
print("ID=%d, Name=%s" % (row['id'], row['name']))

_mssql连接sql server数据库并实现操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import _mssql

# 创建连接
conn = _mssql.connect(server='SQL01', user='user', password='password', \
database='mydatabase')
print(conn.timeout)
print(conn.login_timeout)

# 创建table
conn.execute_non_query('CREATE TABLE persons(id INT, name VARCHAR(100))')

# insert数据
conn.execute_non_query("INSERT INTO persons VALUES(1, 'John Doe')")
conn.execute_non_query("INSERT INTO persons VALUES(2, 'Jane Doe')")

# 查询操作
conn.execute_query('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')
for row in conn:
print "ID=%d, Name=%s" % (row['id'], row['name'])

#查询数量count()
numemployees = conn.execute_scalar("SELECT COUNT(*) FROM employees")

# 查询一条数据
employeedata = conn.execute_row("SELECT * FROM employees WHERE id=%d", 13)

# 带参数查询的几个例子:
conn.execute_query('SELECT * FROM empl WHERE id=%d', 13)
conn.execute_query('SELECT * FROM empl WHERE name=%s', 'John Doe')
conn.execute_query('SELECT * FROM empl WHERE id IN (%s)', ((5, 6),))
conn.execute_query('SELECT * FROM empl WHERE name LIKE %s', 'J%')
conn.execute_query('SELECT * FROM empl WHERE name=%(name)s AND city=%(city)s', \
{ 'name': 'John Doe', 'city': 'Nowhere' } )
conn.execute_query('SELECT * FROM cust WHERE salesrep=%s AND id IN (%s)', \
('John Doe', (1, 2, 3)))
conn.execute_query('SELECT * FROM empl WHERE id IN (%s)', (tuple(xrange(4)),))
conn.execute_query('SELECT * FROM empl WHERE id IN (%s)', \
(tuple([3, 5, 7, 11]),))

#关闭连接
conn.close()

++ pyodbc

pip install pyodbc

修改配置文件 /etc/odbcinst.ini

[SQL Server]
Description     = FreeTDS ODBC driver for MSSQL
Driver          = /usr/lib64/libtdsodbc.so
Setup           = /usr/lib64/libtdsS.so.2
FileUsage       = 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import pyodbc

#创建数据库连接
conn = pyodbc.connect("DRIVER={SQL Server};SERVER=192.168.142.11,1433;
DATABASE=master;UID=sa;PWD=hu_wen@2019")
cursor = conn.cursor() # 获取光标

# 创建表
cursor.execute("""
IF OBJECT_ID('persons', 'U') IS NOT NULL
DROP TABLE persons
CREATE TABLE persons (
id INT NOT NULL,
name VARCHAR(100),
salesrep VARCHAR(100),
PRIMARY KEY(id)
)
""")

# 插入多行数据
cursor.executemany(
"INSERT INTO persons VALUES (%d, '%s', '%s')",
[(1, 'John Smith', 'John Doe'),
(2, 'Jane Doe', 'Joe Dog'),
(3, 'Mike T.', 'Sarah H.')])

# 提交数据
conn.commit()

# 查询数据
cursor.execute('SELECT * FROM persons WHERE salesrep=%s', 'John Doe')

# 遍历数据(存放到元组中) 方式1
row = cursor.fetchone()
while row:
print("ID=%d, Name=%s" % (row[0], row[1]))
row = cursor.fetchone()

# 遍历数据(存放到元组中) 方式2
for row in cursor:
print('row = %r' % (row,))

# 关闭连接

Mysql数据库

pip install pymysql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from myconn import myconn

class Sql:
def __init__(self,db=None):
# 初始化连接数据库
self.conn = pymysql.connect(db=dataName, user="user", passwd="passwd",
host="host",charset='utf8')

def get_databases(self):
''' 获取所有数据库名 '''
return self.getSqlData("show databases")

def get_tables(self):
''' 获取所有表名 '''
return self.getSqlData("show tables")

def create_table(self,table):
''' 创建表 '''
cur = self.conn.cursor()
# id:无符号整数,主键,自增;name:字符串;age:无符号整数
sql = """create table {} (id int unsigned primary key auto_increment name varchar(10),
age int unsigned)""".format(table)
cur.execute(sql)
self.conn.commit()

def insert_data(self,table):
''' 增加一条数据到数据表 '''
cur = self.conn.cursor()
sql = "insert into {}(name,age) values(%s,%s)"
cur.execute(sql,('myName',80))
self.conn.commit()

def update_data(self,table):
''' 修改表中的数据 '''
cur = self.conn.cursor()
sql = "update {} set age=18 where name='myName'".format(table)
cur.execute(sql)
self.conn.commit()

def select_data(self, table):
''' 从数据库表查询数据 '''
cur = self.conn.cursor()
sql = "select name,age from {}".format(table)
cur.execute(sql)
return cur.fetchall()

def delete_data(self, table):
''' 从数据库表删除数据 '''
cur = self.conn.cursor()
sql = "delete from {} where name='myName'".format(table)
cur.execute(sql)
self.conn.commit()


def get_fields(self,table):
''' 获取指定表的字段 '''
cur = self.conn.cursor()
sql = "SELECT * FROM {} LIMIT 1".format(table)
cur.execute(sql)
v = cur.description
zds = [i[0] for i in v]
self.conn.commit()
return zds


def unique(self,table,*fields):
''' 唯一设置
table:表名,fields:字段名列表; '''
cur = self.conn.cursor()
if len(fields) == 1:
sql = "ALTER TABLE {} ADD unique(".format(table)
else:
sql = "ALTER TABLE {} ADD UNIQUE KEY(".format(table)
for i in fields:
sql += i
if i != fields[-1]:
sql += ','
else:
sql += ')'
try:
cur.execute(sql)
except Exception as exc:
print(exc)
else:
self.conn.commit()

def closeSql(self):
''' 关闭数据库连接 '''
self.conn.close()

MongoDB

++ MongoDB官网下载

pip install pymongo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import pymongo
import datetime

class Mongodb:
""" Python MangoDB 的简单操作 """

def __init__(self):
# 1、建立连接
# client = pymongo.MongoClient('localhost',27017) # 第一种方法
client = pymongo.MongoClient('mongodb://localhost:27017') # 第二种方法

# 2、获取数据库
# db = client.db_name # 第一种方法
db = client['db_name'] # 第二种方法

# 3、获取一个集合
# self.collection = db.table # 第一种方法
self.collection = db['table'] # 第二种方法

def insert_data(self):
''' 插入文档 '''
# name:姓名;age:年龄;datetime:存储时间
user = {
"name":"myName",
"age":18,
"datetime":datetime.datetime.utcnow()
}
# 插入后,如果文档内没有_id这个键值,则系统会自动添加_id;其中user可以是多个user组成的列表
user_id = self.collection.insert(user)
return user_id

def find_one_data(self):
''' 查询一条文档 '''
data = self.collection.find_one({"name":"myName"}) # 查询name为myName的
return data

def find_data(self):
''' 查询多个文档 '''
# data = [d for d in self.collection.find()] # 查询所有
# 查询所有指定name的文档
data = [d for d in self.collection.find({"name":"myName"})]
return data

def find_limit_data(self):
''' 根据条件查询数据:
MongoDB中条件操作符有:
(>) 大于 - $gt
(<) 小于 - $lt
(>=) 大于等于 - $gte
(<= ) 小于等于 - $lte '''
data = self.collection.find({"age":{"$gt": 12}}) # 查询年龄大于12岁的
return data

def get_size(self):
''' 查询符合条件的文档条数 '''
size = self.collection.find({"name":"myName"}).count()
return size

def get_names(self):
''' 查询所有 name 的值,不重复的。返回list '''
names = self.collection.distinct('name')
return names

def update_data(self):
''' 修改文档 '''
self.collection.update({"name":"myName"},{"$set":{"age":28}})

def delete_data(self):
''' 删除文档 '''
self.collection.remove({"name":"myName"})

Redis 数据库

pip install redis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import redis

class MyRedis:
""" Redis 数据库简单使用 """
def __init__(self):
        # 1、建立 Redis连接
# 第一种方式
# self.red = redis.Redis(host='localhost',port=6379,db=0)
# 第二种方式(以连接池的方式)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
        self.red = redis.Redis(connection_pool=pool)

def insert_data(self):
''' 插入数据 '''
self.red.set("key","value")

def get_data(self):
''' 查询数据 '''
data = self.red.get("key")
return data

def delete_data(self):
''' 删除数据 '''
self.red.delete('key')

Redis 之 消息推送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#### 服务端发布数据
import redis
import time
from datetime import datetime

def resis_server(host,port):
pool = redis.ConnectionPool(host=host,port=port,db=1)
r = redis.StrictRedis(connection_pool=pool)
count = 1
while True:
dt = datetime.now()
print(dt) # 要发布的数据
# 发布到指定频道
r.publish('channel_name',dt)
if count >= 10:
# 只是为了测试,所以限制为10次后,发布结束通知
r.publish('channel_name','end')
print('停止发布')
break
time.sleep(1)
count += 1


if __name__ == '__main__':
resis_server('192.168.88.64',6379)

#### 客户端接收数据
import redis

def redis_client(host,port):
pool = redis.ConnectionPool(host=host,port=port,db=1)
r = redis.StrictRedis(connection_pool=pool)
p = r.pubsub()
p.subscribe('channel_name') # 订阅频道
for item in p.listen():
print(item) # 输出字典
if item['type'] == 'message':
data = item['data']
print(data) # 输出发送过来的消息
if item['data'] == b'end':
break
p.unsubscribe('channel_name')
print('取消订阅')

if __name__ == '__main__':
redis_client('192.168.88.64',6379)

hdf5 快速存取数据

pip install h5py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import h5py

def write_h5():
''' 写入数据,写入一个list '''
h5 = h5py.File('test.hdf5','w')
h5['/test/t1'] = [1,2,3,4,5]
h5.close()

def read_h5():
''' 读取数据 '''
h5 = h5py.File('test.hdf5','r')
v = h5['/test/t1'].value
h5.close()
return v

if __name__ == '__main__':
write_h5()
v = read_h5()
print(v)

Oracle 数据库

1.用户名,密码和监听分别作为参数

conn=cx_Oracle.connect('用户名','密码','数据库地址:数据库端口/SID')

2.用户名,密码和监听共同作为一个参数

conn=cx_Oracle.connect('用户名/密码@数据库地址:数据库端口/SID')

3.使用tns配置信息

tns=cx_Oracle.makedsn('数据库地址','数据库端口', 'SID')
conn=cx_Oracle.connect('用户名','密码',tns)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#coding:utf-8
import cx_Oracle
# 封装的类
class cxOracle:
'''
tns的取值tnsnames.ora对应的配置项的值,如:
tns = '(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)
(HOST=10.16.18.23)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=MYDB)))'
'''
def __init__(self ,uname, upwd,tns ):
self ._uname = uname
self ._upwd = upwd
self ._tns = tns
self ._conn = None
self ._ReConnect()
def _ReConnect(self ):
if not self._conn :
self ._conn = cx_Oracle.connect (self. _uname, self ._upwd, self._tns)
else:
pass
def __del__(self ):
if self. _conn:
self ._conn. close()
self ._conn = None
def _NewCursor(self ):
cur = self. _conn.cursor ()
if cur:
return cur
else:
print "#Error# Get New Cursor Failed."
return None
def _DelCursor(self , cur):
if cur:
cur .close()
# 检查是否允许执行的sql语句
def _PermitedUpdateSql(self ,sql):
rt = True
lrsql = sql. lower()
sql_elems = [ lrsql.strip ().split()]
# update和delete最少有四个单词项
if len( sql_elems) < 4 :
rt = False
# 更新删除语句,判断首单词,不带where语句的sql不予执行
elif sql_elems[0] in [ 'update', 'delete']:
if 'where' not in sql_elems :
rt = False
return rt
# 导出结果为文件
def Export(self , sql, file_name, colfg ='||'):
rt = self. Query(sql )
if rt:
with open( file_name, 'a') as fd:
for row in rt:
ln_info = ''
for col in row:
ln_info += str( col) + colfg
ln_info += '\n'
fd .write( ln_info)
# 查询
def Query(self , sql, nStart=0 , nNum=- 1):
rt = []
# 获取cursor
cur = self. _NewCursor()
if not cur:
return rt
# 查询到列表
cur .execute(sql)
if ( nStart==0 ) and (nNum==1 ):
rt .append( cur.fetchone ())
else:
rs = cur. fetchall()
if nNum==- 1:
rt .extend( rs[nStart:])
else:
rt .extend( rs[nStart:nStart +nNum])
# 释放cursor
self ._DelCursor(cur)
return rt
# 更新
def Exec(self ,sql):
# 获取cursor
rt = None
cur = self. _NewCursor()
if not cur:
return rt
# 判断sql是否允许其执行
if not _PermitedUpdateSql(sql ):
return rt
# 执行语句
rt = cur. execute(sql )
# 释放cursor
self ._DelCursor(cur)
return rt

# 类使用示例
tns = '(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=10.16.17.46)
(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=MYDB)))'
ora = cxOracle ('nicker', '123456', tns)
# 导出结果为文件
rs = ora .Export("SELECT * FROM org", '1.txt')
# 查询结果到列表
rs = ora.Query("SELECT * FROM org")
print rs
# 更新数据
ora.Exec("update org set org_name='NewNameForUpdate' where org_id=123456;")