Neo4jConnection provide the neo4j.Session on every call to the get method. And will close the connection when exiting the "with" statement.

Example

your_core_lib.yaml

core_lib:
  data:
    neo4j:
      credentials:
          username: ${oc.env:NEO4J_USERNAME}
          password: ${oc.env:NEO4J_PASSWORD}
      url:
          host: neo4j
          port: 7474

your_core_lib.py

from core_lib.connection.neo4j_connection_registry import Neo4JConnectionRegistry


class YourCoreLib(CoreLib):
    def __init__(self, conf: DictConfig):
        self.config = conf.core_lib
		    neo4j = Neo4JConnectionRegistry(self.config.core_lib.data.neo4j)  
        self.search = SearchDataAccess(neo4j)

search_data_access.py

class SearchDataAccess(DataAccess):

  	ALLOW_UDATE_FIELDS = ['name', 'gender', ...]
  	def __init__(self, db: Neo4JConnectionRegistry):
   			self._db = db    

		def search(self, name: str):
		  	with self._db.get() as session:
    				cypher = 'MATCH (me:Book {name:$name}) ...'
			      result = session.run(cypher, {'name': name})
      			return [dict(r) for r in result]
  
		def insert(self, data: dict):
        parameters = {'user_id': data[Book.id.key],
                      'name': data[Book.nick_name.key],
                      'gender': data[Book.gender.key]}
        with self._neo_db.get() as session:
        		return session.run('MERGE (u:User {user_id:$user_id}) ON CREATE SET u.name=$name, ...', parameters) 

		def update(self, user_id: int, data: dict):
				query_fields = []
        update_data = {}
        for key, value in data.items():
            if key in ALLOW_UDATE_FIELDS:
					  		query_fields.append(f'u.{key}=${key}')
                update_data[key] = value
				if query_fields:
        		fields_str = ','.join(query_fields)
            update_data['user_id'] = user_id
            with self._neo_db.get() as session:
            		return session.run(f'MATCH (u:User ) SET {fields_str} RETURN u', update_data)

Neo4jConnectionRegistry

core_lib.connection.neo4j_connection_registry.Neo4jConnectionRegistry [source]

This class is responsible for initializing the neo4j.GraphDatabase driver and passing a freshneo4j.Session on to the Neo4jConnection in the get() method.

Functions

__init__()

core_lib.connection.neo4j_connection_registry.Neo4jConnectionRegistry.__init__() [source]

Responsible for initializing neo4j.GraphDatabase from configuration

def __init__(self, config: DictConfig):

Arguments

  • config (DictConfig): Neo4j section config from Core-Lib's config.

driver()

core_lib.connection.neo4j_connection_registry.Neo4jConnectionRegistry.driver() [source]

Returns the driver.

def driver(self) -> GraphDatabase:

Returns

(GraphDatabase): Returns the Neo4j driver.

get()

core_lib.connection.neo4j_connection_registry.Neo4jConnectionRegistry.get() [source]

Returns the Neo4jConnection instance by creating a new neo4j.Session from the neo4j.GraphDatabase driver .

def get(self, *args, **kwargs) -> Neo4jConnection:

Arguments

  • *args, **kwargs: The args and kwargs of the function.

Returns

(Neo4jConnection): Returns the Neo4jConnection instance.

Neo4jConnection

core_lib.connection.neo4j_connection.Neo4jConnection [source]

This class gets the neo4j.Session session on initialization.

class Neo4jConnection(Connection):
    def __init__(self, neo4j_session: Session):
        self.neo4j_session = neo4j_session

Arguments

  • neo4j_session (Session): The neo4j.Session provided from Neo4jConnectionRegistry.