138 lines
4.6 KiB
Python
138 lines
4.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from sqlalchemy import create_engine, MetaData
|
|
from sqlalchemy.ext.automap import automap_base
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
class CitaviProject():
|
|
""" A helper class representing a Citavi project. """
|
|
def __init__(self, sqlite_file):
|
|
""" Constructor to get absolute path to sqlite. """
|
|
self.sqlite_file = sqlite_file
|
|
self._is_open = False
|
|
self._is_error = False
|
|
self._sa = {} # Object namespace for sqlalchemy related objects
|
|
|
|
def __del__(self):
|
|
""" Destructor to close the citavi file. """
|
|
self.close()
|
|
|
|
def is_valid(self):
|
|
""" This method returns False in case the given sqlite file is not a valid Citavi project. """
|
|
self.open()
|
|
return self._is_open and not self._is_error
|
|
|
|
def get_persons(self):
|
|
try:
|
|
PersonClass = self._sa_sqlite_autobase.classes.Person
|
|
except:
|
|
return False
|
|
|
|
def open(self):
|
|
""" Public method to open a citavi project file. """
|
|
if not self._is_open:
|
|
self._open()
|
|
|
|
def close(self):
|
|
""" Public method to close a citavi project file. """
|
|
if self._is_open:
|
|
self._close()
|
|
|
|
def _open(self):
|
|
""" Internal method to open a citavi project file. """
|
|
try:
|
|
self._sa_sqlite_engine = create_engine('sqlite+pysqlite:///' + self.sqlite_file)
|
|
self._sa_sqlite_session = Session(self._sa_sqlite_engine)
|
|
self._sa_sqlite_meta = MetaData(bind=self._sa_sqlite_engine)
|
|
self._sa_sqlite_meta.reflect()
|
|
self._sa_sqlite_autobase = automap_base()
|
|
self._sa_sqlite_autobase.prepare(self._sa_sqlite_engine, reflect=True)
|
|
self._is_open = True
|
|
except:
|
|
self._is_error = True
|
|
self._is_open = False
|
|
|
|
def _close(self):
|
|
""" Internal method to actually close a citavi project file. """
|
|
try:
|
|
del self._sa.sqlite_engine
|
|
del self._sa.sqlite_session
|
|
del self._sa.sqlite_meta
|
|
del self._sa.sqlite_autobase
|
|
del self._sa.sqlite_engine
|
|
self._is_open = False
|
|
except:
|
|
self._is_error = True
|
|
self._is_open = True
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
def import_sqlite(project_id, sqlite_file):
|
|
print "This is import_sqlite speaking!"
|
|
print project_id
|
|
print sqlite_file
|
|
print "Importing data..."
|
|
|
|
# Initialize sqlite part
|
|
sqlite_engine = create_engine('sqlite+pysqlite:///project_import_9.ctt4', echo=True)
|
|
sqlite_session = Session(sqlite_engine)
|
|
sqlite_meta = MetaData(bind=sqlite_engine)
|
|
sqlite_meta.reflect()
|
|
|
|
# Initialize postresql part
|
|
psql_engine = create_engine("postgresql://citavi_mapper:foobar2000@localhost:5432/citavi_mapper")
|
|
psql_session = Session(psql_engine)
|
|
psql_engine.execute(CreateSchema('alchemytest')) # TODO: Catch Exception/Warning/Whatever here
|
|
psql_meta = MetaData(bind=psql_engine, schema='alchemytest')
|
|
|
|
# Reflect the origin - maybe psql_meta.reflect() is enough?
|
|
sqlite_autobase = automap_base()
|
|
sqlite_autobase.prepare(sqlite_engine, reflect=True)
|
|
|
|
|
|
# This small snippet prints python class code for sqlalchemy reflected classes
|
|
for table in sqlite_meta.sorted_tables:
|
|
table_name = str(table)
|
|
columns = table.columns.items()
|
|
print "class " + table_name + "(Base):"
|
|
print " __tablename__ = '" + table_name + "'"
|
|
print
|
|
for column_tuple in columns:
|
|
column_name = column_tuple[0]
|
|
actual_sqlalchemy_column = column_tuple[1]
|
|
column_type = str(vars(actual_sqlalchemy_column)['type'])
|
|
column_type = str(vars(actual_sqlalchemy_column)['type'])
|
|
#print table_name + "." + column_name + ': ' + column_type
|
|
print " " + column_name + " = Column(" + column_type + ")"
|
|
print
|
|
print
|
|
|
|
# Shove it up into postgresql's ... you know.
|
|
psql_meta.create_all(psql_engine)
|
|
|
|
#sqlite_meta.reflect()
|
|
|
|
#meta.tables['Reference']
|
|
#meta.tables['ReferenceAuthor'].select().execute().fetchall()
|
|
|
|
# Now we need to "convert" unsupported types :-/
|
|
for table in meta.sorted_tables:
|
|
table_name = str(table)
|
|
columns = table.columns.items()
|
|
for column_tuple in columns:
|
|
column_name = column_tuple[0]
|
|
actual_sqlalchemy_column = column_tuple[1]
|
|
column_type = str(vars(actual_sqlalchemy_column)['type'])
|
|
print table_name + "." + column_name + ': ' + column_type
|
|
print
|
|
print
|
|
""" |