當前位置:首頁 » 數據倉庫 » pythonoracle資料庫查詢
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

pythonoracle資料庫查詢

發布時間: 2023-01-21 03:11:19

Ⅰ 怎麼用Python腳本怎麼從oracle資料庫中取出clob數據

stmt = con.prepareStatement("select attach,fjmc,piid,swsj fromreceiveFile");//attach是clolb對象
rs = stmt.executeQuery( );
while (rs.next()) {
java.sql.Blob blob = rs.getBlob(1);//這一句可獲得blob,clob等對象。

然後再把blob轉成文件

File file = new File("G:\\XiangMu_dwoa\\資料庫文件資料\\aaa");
OutputStream fout = new FileOutputStream(file);
//下面將BLOB數據寫入文件
byte[] b = new byte[1024];
int len = 0;
while ( (len = ins.read(b)) != -1) {
fout.write(b, 0, len);

你可以參考一下

Ⅱ python如何保存從oracle資料庫中讀取的BLOB文件

import cx_Oracle

con = cx_Oracle.connect(『username』, 『password』, 『dsn』)

blob_sql = "select column_name from table where clause"

cursor = con.cursor()

cursor.execute(blob_sql)

result = cursor.fetchall()

file = open('file_name', "wb")

file.write(result[0][0].read()) #可以print查看result的內容,根據實際情況read

file.close()

Ⅲ Python 進行 Oracle 與 Mysql 不同資料庫類型之間的數據 diff


項目工作中,可能會有 A 類型資料庫數據需要遷移到 B 類型的資料庫中的需求。 例如:假設現有一個資料庫的遷移需求,是將 Oracle 資料庫里的數據遷移至 Mysql 資料庫中。 常規的測試方法是人工去抽樣檢測數據進行 diff,因為數據量太大,人工不可能實現全量數據的肉眼 diff。

因此,為提高數據 diff 的正確性以及測試效率,編寫測試腳本進行全量遷移數據的 diff 是十分必要的。以下內容將會著重講解如何使用 Python 編寫腳本來實現 Oracle 與 Mysql 這種不同類型資料庫之間數據的 diff。


1、連接oracle資料庫並獲取要提取的數據,並輸出列表裡面嵌入的字典類型的數據

2、連接mysql資料庫並獲取要提取的數據,並輸出列表裡面嵌入的字典類型的數據

3、Oracle 與 Mysql 資料庫進行 diff,直接調用上面兩個方法即可

4、執行後的輸出結果


Ⅳ python連接oracle資料庫報出 ORA-12541: TNS: 無監聽程序

方法一:
在oracle_home下找到lsnrctl.exe 輸入 start
方法二
可能認不到實例名
在cmd下運行
set oracle_sid=自己資料庫的實例名(大多數orcl)
網上還有很多解決辦法
也許不見得管用,這種問題具體問題具體分析比較好。介紹的這2種您的機器還不行。就去修改一個TNSNAMES.ORA。

Ⅳ 如何使用Python連接Oracle資料庫

  1. 下載cx_Oracle,下載之後就可以使用了。

  2. 簡單的使用流程如下:

  3. 1.引用模塊cx_Oracle
    2.連接資料庫
    3.獲取cursor
    4.使用cursor進行各種操作
    5.關閉cursor
    6.關閉連接

    參考代碼:

    import cx_Oracle #引用模塊cx_Oracle
    conn=cx_Oracle.connect('load/123456@localhost/ora11g') #連接資料庫
    c=conn.cursor() #獲取cursor
    x=c.execute('select sysdate from al') #使用cursor進行各種操作
    x.fetchone()
    c.close() #關閉cursor
    conn.close() #關閉連接

Ⅵ python如何自動獲取oracle資料庫中所有表的表結構

你看你怎麼調用這個sql語句吧

selecta.owner所屬用戶,
a.table_name表名,
a.column_name欄位名,
a.data_type欄位類型,
a.欄位長度,
a.欄位精度,
a.是否為空,
a.創建日期,
a.最後修改日期,
casewhena.owner=d.owneranda.table_name=d.table_nameanda.column_name=d.column_namethen'主鍵'else''end是否主鍵
from
(selecta.owner,a.table_name,b.column_name,b.data_type,casewhenb.data_precisionisnullthenb.data_lengthelsedata_precisionend欄位長度,data_scale欄位精度,
decode(nullable,'Y','√','N','×')是否為空,c.created創建日期,c.last_ddl_time最後修改日期
fromall_tablesa,all_tab_columnsb,all_objectsc
wherea.table_name=b.table_nameanda.owner=b.owner
anda.owner=c.owner
anda.table_name=c.object_name
anda.owner='SCOTT'--這個是查某個用戶,你到時候把用戶名換一下就好,一定大寫
andc.object_type='TABLE')a
leftjoin
(selecta.owner,a.table_name,a.column_name,a.constraint_namefromuser_cons_columnsa,user_constraintsb
wherea.constraint_name=b.constraint_nameandb.constraint_type='P')d
ona.owner=d.owneranda.table_name=d.table_nameanda.column_name=d.column_name
orderbya.owner,a.table_name;

Ⅶ python使用oracle查詢資料庫,查詢語句中使用變數值

cursor.execute('select * from INV.MTL_ITEM_REVISIONS where ROW_ID= %s'% (Item,))

換為:
qry_sql = "select * from INV.MTL_ITEM_REVISIONS where ROW_ID= '%s'" % Item
cursor.execute(qry_sql)

Ⅷ python3.6 cx_oracle連接資料庫報編碼錯UnicodeDecodeError

我說下我遇到的情況

資料庫字元集是 ZHS16GBK

錯誤的情況是

UnicodeDecodeError:'gbk'codeccan'tdecodebyte0xa7inposition12:illegalmultibytesequence

經過檢查,在fetchall()獲取記錄時,查詢到的記錄裡面有亂碼(應該是不包含在資料庫現有字元集下的字元)

臨時的一個解決辦法是

db=cx_Oracle.connect(dblink,encoding='UTF-8')

這樣可以讀取了,讀取到的內容為

廣州市ue738同泰路

其中 'ue738'應該是之前不可被讀取的字元,希望對各位有幫助

Ⅸ python oracle 查詢結果怎麼映射為對象

安裝好了cx_Oracle.msi MySQL.msi 下載安裝 xlwt-0.7.5.tar.gz, 到安裝目錄下 命令窗口cmd下執行 Python setup.py install即可
被引用的文件:
[html] view plain
# coding: utf-8
# xlswriter.py
# http://pypi.python.org/pypi/xlwt
import xlwt
class XLSWriter(object):
"""A XLS writer that proces XLS files from unicode data.
"""
def __init__(self, file, encoding='utf-8'):
# must specify the encoding of the input data, utf-8 default.
self.file = file
self.encoding = encoding
self.wbk = xlwt.Workbook()
self.sheets = {}
def create_sheet(self, sheet_name='sheet'):
"""Create new sheet
"""
if sheet_name in self.sheets:
sheet_index = self.sheets[sheet_name]['index'] + 1
else:
sheet_index = 0
self.sheets[sheet_name] = {'header': []}
self.sheets[sheet_name]['index'] = sheet_index
self.sheets[sheet_name]['sheet'] = self.wbk.add_sheet('%s%s' % (sheet_name, sheet_index if sheet_index else ''), cell_overwrite_ok=True)
self.sheets[sheet_name]['rows'] = 0
def cell(self, s):
if isinstance(s, basestring):
if not isinstance(s, unicode):
s = s.decode(self.encoding)
elif s is None:
s = ''
else:
s = str(s)
return s
def writerow(self, row,xlsstyle, sheet_name='sheet'):
if sheet_name not in self.sheets:
# Create if does not exist
self.create_sheet(sheet_name)
if self.sheets[sheet_name]['rows'] == 0:
self.sheets[sheet_name]['header'] = row
if self.sheets[sheet_name]['rows'] >= 65534:
self.save()
# create new sheet to avoid being greater than 65535 lines
self.create_sheet(sheet_name)
if self.sheets[sheet_name]['header']:
self.writerow(self.sheets[sheet_name]['header'], sheet_name)
for ci, col in enumerate(row):
#self.sheets[sheet_name]['sheet'].col(col).width=0x0d00
self.sheets[sheet_name]['sheet'].write(self.sheets[sheet_name]['rows'], ci, self.cell(col) if type(col) != xlwt.ExcelFormula.Formula else col,xlsstyle)
self.sheets[sheet_name]['rows'] += 1
def writerows(self, rows,style, sheet_name='sheet'):
for row in rows:
self.writerow(row,style, sheet_name)
def save(self):
self.wbk.save(self.file)
if __name__ == '__main__':
# test
xlswriter = XLSWriter(u'陝西.xls')
ft=xlwt.Font()
ft.height =0x00C8
ft.bold = True
ft1=xlwt.Font()
ft1.bold=False
style0=xlwt.XFStyle()
style0.font=ft
style1=xlwt.XFStyle()
style1.font=ft1
xlswriter.writerow(['姓名', '年齡', '電話', 'QQ'], style0,sheet_name=u'基本信息')
xlswriter.writerow(['張三', '30', '13512345678', '123456789'],style1, sheet_name=u'基本信息')
xlswriter.writerow(['學校', '獲得學位', '取得學位時間'], style0,sheet_name=u'學習經歷')
xlswriter.writerow(['西安電子科技大學', '學士', '2009'],style1, sheet_name=u'學習經歷')
xlswriter.writerow(['西安電子科技大學', '碩士', '2012'], style1,sheet_name=u'學習經歷')
xlswriter.writerow(['王五', '30', '13512345678', '123456789'],style1, sheet_name=u'基本信息')
# don't forget to save data to disk
xlswriter.save()
print 'finished.'
連接Oracle並生成excel
[python] view plain
#! /usr/bin/env python
#coding=utf-8
import xlwt,cx_Oracle,datetime,MySQLdb
from XLSWriter import XLSWriter
__s_date = datetime.date(1899, 12, 31).toordinal()-1
'''''
Excel中的日期為浮點數則轉為標准日期格式
'''
def getdate(date):
if isinstance(date, float):
date = int(date)
d = datetime.date.fromordinal(__s_date + date)
return d.strftime("%Y%m%d")
def getYesterday():
'''''
昨天
'''
today=datetime.date.today()
oneday=datetime.timedelta(days=1)
yesterday=today-oneday
return yesterday
print getYesterday().strftime("%Y-%m-%d")
'''''
獲取GIPAP、TIPAP新批再批患者名單
'''
def getGipapTipapNewReactivePass (sql):
try:
db=cx_Oracle.connect("user","pwd",'192.168.1.1:1521/orcl')
cursor=db.cursor()
SQLTEXT=sql
rslist=[]
rs=cursor.execute(SQLTEXT)
rslist=rs.fetchall()
except MySQLdb.Error,e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])
cursor.close()
db.close()
return rslist
'''''
將查詢結果集寫入xls文件
'''
def writeDateToXls(xlaname,style,paptype,papname):
gsql=u"select t.pchinesename,t.pmobile,t.pphone,t.pplanbegindate,d.dname from (select m.mrpatient,m.mrplanbegintime,m.mrplanendtime,m.mrendtime,m.mrbegintime from tb_ m "
mailtype=1
newplan=' '
#注意,這里的變數passtype passpap 即為導出後的excel前兩列值 ulipad編輯器此處不能用中文,未解決 經測試 EitPlus編輯器正常 如:passtype=u'再批'
passtype='Reactive'
domain=1
passpap='gipap'
gsql+=u"where m.mrmailtype="+str(mailtype)+ str(newplan) +" and f_domain_by_pid(m.mrpatient)="+str(domain)+" and status='1' and m.mrendtime =(to_char(trunc(sysdate-1),'yyyy-mm-dd'))) a "
gsql+="left join tb_ t on t.pid=a.mrpatient "
gsql+="left join dm_ p on p.pid=t.pplan "
gsql+="left join tb_ e on e.eid=t.pcsa "
gsql+="left join tb_ j on j.jemployee=t.pcsa "
gsql+="left join tb_ d on d.did=j.jdepartment "
gsql+="where d.dstatus='A' and j.jstatus='A' and e.estatus='A' "
print gsql
#print papname+passtype
rslist=[]
rslist=getGipapTipapNewReactivePass(gsql)
print len(rslist)
xlswriter.writerow(['批註類型','葯品名稱','患者姓名','手機','固話','批准時間(援助開始時間)','發葯點'],style0, sheet_name=papname+passtype)
#這里設置樣式
for p in rslist:
xlswriter.writerow([passtype,passpap,
'' if p[0] is None else p[0].decode('gbk').encode('utf-8'),
'' if p[1] is None else p[1].decode('gbk').encode('utf-8'),
'' if p[2] is None else p[2].decode('gbk').encode('utf-8'),
'' if p[3] is None else p[3].decode('gbk').encode('utf-8'),
'' if p[4] is None else p[4].decode('gbk').encode('utf-8')],style, sheet_name=papname+passtype)
del rslist[:]
if __name__ == '__main__':
#don't forget to save data to disk
ft=xlwt.Font()
ft.height =0x00C8
ft.bold = True
ft1=xlwt.Font()
ft1.bold=False
style0=xlwt.XFStyle()
style0.font=ft
style1=xlwt.XFStyle()
style1.font=ft1
createdate=str(datetime.datetime.now().strftime('%Y%m%d' ))
xlsname=u'GIPAP_NEW_PATIENT_再批患者'+str(createdate)+'.xls'
#xlswriter=XLSWriter(xlsname)
xlswriter=XLSWriter(u'F:\\payton\\再批患者報告\\'+xlsname)
writeDateToXls(xlsname,style1,'Reactive','GIPAP')
xlswriter.save()
print 'finished.'
控制台輸出:
excel導出數據:

Ⅹ python用管理員身份連接oracle資料庫時報錯:

給你一個連接Oracle的公共庫:

#-*-coding:utf-8-*-
#!/usr/bin/envpython
'''
Createdon2014年8月4日

@author:188007

連接Oracle資料庫的class
'''
importos
os.environ['NLS_LANG']='SIMPLIFIEDCHINESE_CHINA.ZHS16GBK'
importcx_Oracle
importsys

classOracle:
'''

'''

def__init__(self,ip,port,db,user,pwd):
self.ip=ip
self.port=port
self.db=db
self.user=user
self.pwd=pwd

def__GetConnect(self):
"""得到連接信息返回:conn.cursor()"""
ifnotself.db:
raise(NameError,"沒有設置資料庫信息")
dsn=cx_Oracle.makedsn(self.ip,self.port,self.db)
self.conn=cx_Oracle.connect(self.user,self.pwd,dsn)
cur=self.conn.cursor()
ifnotcur:
raise(NameError,"連接資料庫失敗")
else:
returncur

defExecQuery(self,sql):
"""執行查詢語句返回的是一個包含tuple的list,list的元素是記錄行,tuple的元素是每行記錄的欄位
調用示例:
oracle=Oracle('10.27.95.253','1561','GFDMS','GFITAPPS','GFITAPPS')
resList=oracle.ExecQuery("SELECTOBJID,NUMCODE,AREACODE,AREANAME,PROVCODE,PROVNAME,CITYCODE,CITYNAME,TOWNCODE,TOWNNAME,REMARKFROMGFDMS.THZONEORDERBYNUMCODE")
for(AREANAME)inresList:
printstr(AREANAME).decode('gb2312')
"""
try:
cur=self.__GetConnect()
cur.execute(sql)
resList=cur.fetchall()

#查詢完畢後必須關閉連接
self.conn.close()
exceptException,err:
sys.exit(1)

returnresList

defExecNonQuery(self,sql):
"""執行非查詢語句
調用示例:
oracle.ExecNonQuery("insertintoTHZONEvalues('x','y')")
"""
try:
cur=self.__GetConnect()
cur.execute(sql)
self.conn.commit()
self.conn.close()
exceptException,err:
sys.exit(1)

#defmain():
##oracle=Oracle('10.27.95.253','1561','GFDMS','GFITAPPS','GFITAPPS')
###返回的是一個包含tuple的list,list的元素是記錄行,tuple的元素是每行記錄的欄位
##oracle.ExecNonQuery("insertintoTHZONEvalues('x','y')")
#oracle=Oracle('10.27.95.253','1561','GTEST','GTAPPS','GTAPPS')
#resList=oracle.ExecQuery("SELECTOBJID,NUMCODE,AREACODE,AREANAME,PROVCODE,PROVNAME,CITYCODE,CITYNAME,TOWNCODE,TOWNNAME,REMARKFROMGTEST.THZONEORDERBYNUMCODE")
#for(AREANAME)inresList:
#printstr(AREANAME).decode('utf-8')
#
#if__name__=='__main__':
#main()