当前位置:首页 » 数据仓库 » pythonoracle数据库查询
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

pythonoracle数据库查询

发布时间: 2023-01-21 03:11:19

Ⅰ 怎么用Python脚本怎么从oracle数据库中取出clob数据

stmt = con.prepareStatement("select attach,fjmc,piid,swsj fromreceiveFile");//attach是clolb对象
rs = stmt.executeQuery( );
while (rs.next()) {
java.sql.Blob blob = rs.getBlob(1);//这一句可获得blob,clob等对象。

然后再把blob转成文件

File file = new File("G:\\XiangMu_dwoa\\数据库文件资料\\aaa");
OutputStream fout = new FileOutputStream(file);
//下面将BLOB数据写入文件
byte[] b = new byte[1024];
int len = 0;
while ( (len = ins.read(b)) != -1) {
fout.write(b, 0, len);

你可以参考一下

Ⅱ python如何保存从oracle数据库中读取的BLOB文件

import cx_Oracle

con = cx_Oracle.connect(‘username’, ‘password’, ‘dsn’)

blob_sql = "select column_name from table where clause"

cursor = con.cursor()

cursor.execute(blob_sql)

result = cursor.fetchall()

file = open('file_name', "wb")

file.write(result[0][0].read()) #可以print查看result的内容,根据实际情况read

file.close()

Ⅲ Python 进行 Oracle 与 Mysql 不同数据库类型之间的数据 diff


项目工作中,可能会有 A 类型数据库数据需要迁移到 B 类型的数据库中的需求。 例如:假设现有一个数据库的迁移需求,是将 Oracle 数据库里的数据迁移至 Mysql 数据库中。 常规的测试方法是人工去抽样检测数据进行 diff,因为数据量太大,人工不可能实现全量数据的肉眼 diff。

因此,为提高数据 diff 的正确性以及测试效率,编写测试脚本进行全量迁移数据的 diff 是十分必要的。以下内容将会着重讲解如何使用 Python 编写脚本来实现 Oracle 与 Mysql 这种不同类型数据库之间数据的 diff。


1、连接oracle数据库并获取要提取的数据,并输出列表里面嵌入的字典类型的数据

2、连接mysql数据库并获取要提取的数据,并输出列表里面嵌入的字典类型的数据

3、Oracle 与 Mysql 数据库进行 diff,直接调用上面两个方法即可

4、执行后的输出结果


Ⅳ python连接oracle数据库报出 ORA-12541: TNS: 无监听程序

方法一:
在oracle_home下找到lsnrctl.exe 输入 start
方法二
可能认不到实例名
在cmd下运行
set oracle_sid=自己数据库的实例名(大多数orcl)
网上还有很多解决办法
也许不见得管用,这种问题具体问题具体分析比较好。介绍的这2种您的机器还不行。就去修改一个TNSNAMES.ORA。

Ⅳ 如何使用Python连接Oracle数据库

  1. 下载cx_Oracle,下载之后就可以使用了。

  2. 简单的使用流程如下:

  3. 1.引用模块cx_Oracle
    2.连接数据库
    3.获取cursor
    4.使用cursor进行各种操作
    5.关闭cursor
    6.关闭连接

    参考代码:

    import cx_Oracle #引用模块cx_Oracle
    conn=cx_Oracle.connect('load/123456@localhost/ora11g') #连接数据库
    c=conn.cursor() #获取cursor
    x=c.execute('select sysdate from al') #使用cursor进行各种操作
    x.fetchone()
    c.close() #关闭cursor
    conn.close() #关闭连接

Ⅵ python如何自动获取oracle数据库中所有表的表结构

你看你怎么调用这个sql语句吧

selecta.owner所属用户,
a.table_name表名,
a.column_name字段名,
a.data_type字段类型,
a.字段长度,
a.字段精度,
a.是否为空,
a.创建日期,
a.最后修改日期,
casewhena.owner=d.owneranda.table_name=d.table_nameanda.column_name=d.column_namethen'主键'else''end是否主键
from
(selecta.owner,a.table_name,b.column_name,b.data_type,casewhenb.data_precisionisnullthenb.data_lengthelsedata_precisionend字段长度,data_scale字段精度,
decode(nullable,'Y','√','N','×')是否为空,c.created创建日期,c.last_ddl_time最后修改日期
fromall_tablesa,all_tab_columnsb,all_objectsc
wherea.table_name=b.table_nameanda.owner=b.owner
anda.owner=c.owner
anda.table_name=c.object_name
anda.owner='SCOTT'--这个是查某个用户,你到时候把用户名换一下就好,一定大写
andc.object_type='TABLE')a
leftjoin
(selecta.owner,a.table_name,a.column_name,a.constraint_namefromuser_cons_columnsa,user_constraintsb
wherea.constraint_name=b.constraint_nameandb.constraint_type='P')d
ona.owner=d.owneranda.table_name=d.table_nameanda.column_name=d.column_name
orderbya.owner,a.table_name;

Ⅶ python使用oracle查询数据库,查询语句中使用变量值

cursor.execute('select * from INV.MTL_ITEM_REVISIONS where ROW_ID= %s'% (Item,))

换为:
qry_sql = "select * from INV.MTL_ITEM_REVISIONS where ROW_ID= '%s'" % Item
cursor.execute(qry_sql)

Ⅷ python3.6 cx_oracle连接数据库报编码错UnicodeDecodeError

我说下我遇到的情况

数据库字符集是 ZHS16GBK

错误的情况是

UnicodeDecodeError:'gbk'codeccan'tdecodebyte0xa7inposition12:illegalmultibytesequence

经过检查,在fetchall()获取记录时,查询到的记录里面有乱码(应该是不包含在数据库现有字符集下的字符)

临时的一个解决办法是

db=cx_Oracle.connect(dblink,encoding='UTF-8')

这样可以读取了,读取到的内容为

广州市ue738同泰路

其中 'ue738'应该是之前不可被读取的字符,希望对各位有帮助

Ⅸ python oracle 查询结果怎么映射为对象

安装好了cx_Oracle.msi MySQL.msi 下载安装 xlwt-0.7.5.tar.gz, 到安装目录下 命令窗口cmd下执行 Python setup.py install即可
被引用的文件:
[html] view plain
# coding: utf-8
# xlswriter.py
# http://pypi.python.org/pypi/xlwt
import xlwt
class XLSWriter(object):
"""A XLS writer that proces XLS files from unicode data.
"""
def __init__(self, file, encoding='utf-8'):
# must specify the encoding of the input data, utf-8 default.
self.file = file
self.encoding = encoding
self.wbk = xlwt.Workbook()
self.sheets = {}
def create_sheet(self, sheet_name='sheet'):
"""Create new sheet
"""
if sheet_name in self.sheets:
sheet_index = self.sheets[sheet_name]['index'] + 1
else:
sheet_index = 0
self.sheets[sheet_name] = {'header': []}
self.sheets[sheet_name]['index'] = sheet_index
self.sheets[sheet_name]['sheet'] = self.wbk.add_sheet('%s%s' % (sheet_name, sheet_index if sheet_index else ''), cell_overwrite_ok=True)
self.sheets[sheet_name]['rows'] = 0
def cell(self, s):
if isinstance(s, basestring):
if not isinstance(s, unicode):
s = s.decode(self.encoding)
elif s is None:
s = ''
else:
s = str(s)
return s
def writerow(self, row,xlsstyle, sheet_name='sheet'):
if sheet_name not in self.sheets:
# Create if does not exist
self.create_sheet(sheet_name)
if self.sheets[sheet_name]['rows'] == 0:
self.sheets[sheet_name]['header'] = row
if self.sheets[sheet_name]['rows'] >= 65534:
self.save()
# create new sheet to avoid being greater than 65535 lines
self.create_sheet(sheet_name)
if self.sheets[sheet_name]['header']:
self.writerow(self.sheets[sheet_name]['header'], sheet_name)
for ci, col in enumerate(row):
#self.sheets[sheet_name]['sheet'].col(col).width=0x0d00
self.sheets[sheet_name]['sheet'].write(self.sheets[sheet_name]['rows'], ci, self.cell(col) if type(col) != xlwt.ExcelFormula.Formula else col,xlsstyle)
self.sheets[sheet_name]['rows'] += 1
def writerows(self, rows,style, sheet_name='sheet'):
for row in rows:
self.writerow(row,style, sheet_name)
def save(self):
self.wbk.save(self.file)
if __name__ == '__main__':
# test
xlswriter = XLSWriter(u'陕西.xls')
ft=xlwt.Font()
ft.height =0x00C8
ft.bold = True
ft1=xlwt.Font()
ft1.bold=False
style0=xlwt.XFStyle()
style0.font=ft
style1=xlwt.XFStyle()
style1.font=ft1
xlswriter.writerow(['姓名', '年龄', '电话', 'QQ'], style0,sheet_name=u'基本信息')
xlswriter.writerow(['张三', '30', '13512345678', '123456789'],style1, sheet_name=u'基本信息')
xlswriter.writerow(['学校', '获得学位', '取得学位时间'], style0,sheet_name=u'学习经历')
xlswriter.writerow(['西安电子科技大学', '学士', '2009'],style1, sheet_name=u'学习经历')
xlswriter.writerow(['西安电子科技大学', '硕士', '2012'], style1,sheet_name=u'学习经历')
xlswriter.writerow(['王五', '30', '13512345678', '123456789'],style1, sheet_name=u'基本信息')
# don't forget to save data to disk
xlswriter.save()
print 'finished.'
连接Oracle并生成excel
[python] view plain
#! /usr/bin/env python
#coding=utf-8
import xlwt,cx_Oracle,datetime,MySQLdb
from XLSWriter import XLSWriter
__s_date = datetime.date(1899, 12, 31).toordinal()-1
'''''
Excel中的日期为浮点数则转为标准日期格式
'''
def getdate(date):
if isinstance(date, float):
date = int(date)
d = datetime.date.fromordinal(__s_date + date)
return d.strftime("%Y%m%d")
def getYesterday():
'''''
昨天
'''
today=datetime.date.today()
oneday=datetime.timedelta(days=1)
yesterday=today-oneday
return yesterday
print getYesterday().strftime("%Y-%m-%d")
'''''
获取GIPAP、TIPAP新批再批患者名单
'''
def getGipapTipapNewReactivePass (sql):
try:
db=cx_Oracle.connect("user","pwd",'192.168.1.1:1521/orcl')
cursor=db.cursor()
SQLTEXT=sql
rslist=[]
rs=cursor.execute(SQLTEXT)
rslist=rs.fetchall()
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
cursor.close()
db.close()
return rslist
'''''
将查询结果集写入xls文件
'''
def writeDateToXls(xlaname,style,paptype,papname):
gsql=u"select t.pchinesename,t.pmobile,t.pphone,t.pplanbegindate,d.dname from (select m.mrpatient,m.mrplanbegintime,m.mrplanendtime,m.mrendtime,m.mrbegintime from tb_ m "
mailtype=1
newplan=' '
#注意,这里的变量passtype passpap 即为导出后的excel前两列值 ulipad编辑器此处不能用中文,未解决 经测试 EitPlus编辑器正常 如:passtype=u'再批'
passtype='Reactive'
domain=1
passpap='gipap'
gsql+=u"where m.mrmailtype="+str(mailtype)+ str(newplan) +" and f_domain_by_pid(m.mrpatient)="+str(domain)+" and status='1' and m.mrendtime =(to_char(trunc(sysdate-1),'yyyy-mm-dd'))) a "
gsql+="left join tb_ t on t.pid=a.mrpatient "
gsql+="left join dm_ p on p.pid=t.pplan "
gsql+="left join tb_ e on e.eid=t.pcsa "
gsql+="left join tb_ j on j.jemployee=t.pcsa "
gsql+="left join tb_ d on d.did=j.jdepartment "
gsql+="where d.dstatus='A' and j.jstatus='A' and e.estatus='A' "
print gsql
#print papname+passtype
rslist=[]
rslist=getGipapTipapNewReactivePass(gsql)
print len(rslist)
xlswriter.writerow(['批注类型','药品名称','患者姓名','手机','固话','批准时间(援助开始时间)','发药点'],style0, sheet_name=papname+passtype)
#这里设置样式
for p in rslist:
xlswriter.writerow([passtype,passpap,
'' if p[0] is None else p[0].decode('gbk').encode('utf-8'),
'' if p[1] is None else p[1].decode('gbk').encode('utf-8'),
'' if p[2] is None else p[2].decode('gbk').encode('utf-8'),
'' if p[3] is None else p[3].decode('gbk').encode('utf-8'),
'' if p[4] is None else p[4].decode('gbk').encode('utf-8')],style, sheet_name=papname+passtype)
del rslist[:]
if __name__ == '__main__':
#don't forget to save data to disk
ft=xlwt.Font()
ft.height =0x00C8
ft.bold = True
ft1=xlwt.Font()
ft1.bold=False
style0=xlwt.XFStyle()
style0.font=ft
style1=xlwt.XFStyle()
style1.font=ft1
createdate=str(datetime.datetime.now().strftime('%Y%m%d' ))
xlsname=u'GIPAP_NEW_PATIENT_再批患者'+str(createdate)+'.xls'
#xlswriter=XLSWriter(xlsname)
xlswriter=XLSWriter(u'F:\\payton\\再批患者报告\\'+xlsname)
writeDateToXls(xlsname,style1,'Reactive','GIPAP')
xlswriter.save()
print 'finished.'
控制台输出:
excel导出数据:

Ⅹ python用管理员身份连接oracle数据库时报错:

给你一个连接Oracle的公共库:

#-*-coding:utf-8-*-
#!/usr/bin/envpython
'''
Createdon2014年8月4日

@author:188007

连接Oracle数据库的class
'''
importos
os.environ['NLS_LANG']='SIMPLIFIEDCHINESE_CHINA.ZHS16GBK'
importcx_Oracle
importsys

classOracle:
'''

'''

def__init__(self,ip,port,db,user,pwd):
self.ip=ip
self.port=port
self.db=db
self.user=user
self.pwd=pwd

def__GetConnect(self):
"""得到连接信息返回:conn.cursor()"""
ifnotself.db:
raise(NameError,"没有设置数据库信息")
dsn=cx_Oracle.makedsn(self.ip,self.port,self.db)
self.conn=cx_Oracle.connect(self.user,self.pwd,dsn)
cur=self.conn.cursor()
ifnotcur:
raise(NameError,"连接数据库失败")
else:
returncur

defExecQuery(self,sql):
"""执行查询语句返回的是一个包含tuple的list,list的元素是记录行,tuple的元素是每行记录的字段
调用示例:
oracle=Oracle('10.27.95.253','1561','GFDMS','GFITAPPS','GFITAPPS')
resList=oracle.ExecQuery("SELECTOBJID,NUMCODE,AREACODE,AREANAME,PROVCODE,PROVNAME,CITYCODE,CITYNAME,TOWNCODE,TOWNNAME,REMARKFROMGFDMS.THZONEORDERBYNUMCODE")
for(AREANAME)inresList:
printstr(AREANAME).decode('gb2312')
"""
try:
cur=self.__GetConnect()
cur.execute(sql)
resList=cur.fetchall()

#查询完毕后必须关闭连接
self.conn.close()
exceptException,err:
sys.exit(1)

returnresList

defExecNonQuery(self,sql):
"""执行非查询语句
调用示例:
oracle.ExecNonQuery("insertintoTHZONEvalues('x','y')")
"""
try:
cur=self.__GetConnect()
cur.execute(sql)
self.conn.commit()
self.conn.close()
exceptException,err:
sys.exit(1)

#defmain():
##oracle=Oracle('10.27.95.253','1561','GFDMS','GFITAPPS','GFITAPPS')
###返回的是一个包含tuple的list,list的元素是记录行,tuple的元素是每行记录的字段
##oracle.ExecNonQuery("insertintoTHZONEvalues('x','y')")
#oracle=Oracle('10.27.95.253','1561','GTEST','GTAPPS','GTAPPS')
#resList=oracle.ExecQuery("SELECTOBJID,NUMCODE,AREACODE,AREANAME,PROVCODE,PROVNAME,CITYCODE,CITYNAME,TOWNCODE,TOWNNAME,REMARKFROMGTEST.THZONEORDERBYNUMCODE")
#for(AREANAME)inresList:
#printstr(AREANAME).decode('utf-8')
#
#if__name__=='__main__':
#main()