Source code for graphlite.graph

from contextlib import closing, contextmanager
from sqlite3 import Connection
from threading import Lock

import graphlite.sql as SQL
from graphlite.query import Query
from graphlite.transaction import Transaction, AbortSignal


[docs]class Graph(object): """ Initializes a new Graph object. :param uri: The URI of the SQLite file. :param graphs: Graphs to create. """ def __init__(self, uri, graphs=[]): self.uri = uri self.db = Connection( database=uri, check_same_thread=False, isolation_level=None, ) self.lock = Lock() self.setup_sql(graphs)
[docs] def setup_sql(self, graphs): """ Sets up the SQL tables for the graph object, and creates indexes as well. :param graphs: The graphs to create. """ with closing(self.db.cursor()) as cursor: for item in graphs: cursor.execute(SQL.CREATE_TABLE % (item)) for index in SQL.INDEXES: cursor.execute(index % (item)) self.db.commit()
[docs] def close(self): """ Close the SQLite connection. """ self.db.close()
def __contains__(self, edge): """ Checks if an edge exists within the database with the given source and destination nodes. :param edge: The edge to query. """ with closing(self.db.cursor()) as cursor: cursor.execute(*SQL.select_one(edge.src, edge.rel, edge.dst)) return bool(tuple(cursor)) @property
[docs] def find(self): """ Returns a Query object. """ return Query(self.db)
@contextmanager
[docs] def transaction(self): """ A context manager that returns a Transaction object to the caller. All operations must then be performed on the transaction object. """ trans = Transaction(db=self.db, lock=self.lock) try: yield trans if trans.defined: trans.commit() except AbortSignal: pass