I implemented a solution for having py2neo atomic transactions, by using flask before_request() and after_request():
from flask import Flask, request, app, g
from flask_restful import Resource, Api, abort
from werkzeug.wrappers import Request
from py2neo import Graph
import blog.models
app = Flask(__name__)
api = Api(app)
def get_db():
return Graph(password="secret")
graph = get_db()
@app.before_request
def before_request():
g.tx = graph.begin()
@app.after_request
def after_request(response):
tx = g.get('tx')
if response.status_code >= 400:
tx.rollback()
else:
tx.commit()
return response
class HelloWorld(Resource):
def get(self):
print(g.get('tx')) # This works perfectly
return {'hello': 'Hello World!'}
api.add_resource(HelloWorld, '/')
It actually works, but I built a test that is not working: The value of g.tx is None even when I can see that app.before_request() is properly invoked.
This is my test:
from py2neo import Graph
from flask_restful import Resource, Api
from flask import g, Flask
from blog.rest_api import app, api, get_db
client = app.test_client()
graph = get_db()
def start_database():
tx = graph.begin()
cypher = """
MATCH (n) DETACH DELETE n
"""
tx.run(cypher)
cypher = """
CREATE (: Static)-[: Relates]->(: Editable { field_test: "original" })
"""
tx.run(cypher)
cypher = """
CREATE (: Deletable)
"""
tx.run(cypher)
tx.commit()
def test_starting_database():
start_database()
cypher = """
MATCH (: Static) return count(*) as n
"""
cursor = graph.run(cypher)
assert cursor.forward() == 1, "Test database failed to initialize properly: 'Static'"
cypher = """
MATCH (: Editable) return count(*) as n
"""
cursor = graph.run(cypher)
assert cursor.forward() == 1, "Test database failed to initialize properly: 'Editable'"
cypher = """
MATCH (: Deletable) return count(*) as n
"""
cursor = graph.run(cypher)
assert cursor.forward() == 1, "Test database failed to initialize properly: 'Deletable'"
def test_successful_transaction():
response = client.get('/execute_successful_operations/')
assert response.status_code < 400, response.status
cypher = """
MATCH (e: Editable) return e.field_test
"""
cursor = graph.run(cypher)
assert cursor.forward() == 'commited edition', "Transaction not commited: 'Editable'."
cypher = """
MATCH (: Deletable) return count(*) as n
"""
cursor = graph.run(cypher)
assert cursor.forward() == 0, "Transaction not commited: 'Deletable'."
def test_failed_transaction():
response = client.get('/execute_failed_operations/')
assert response.status_code < 400, response.status
cypher = """
MATCH (e: Editable) RETURN e.field_test
"""
cursor = graph.run(cypher)
assert cursor.forward() == 'commited edition', "Transaction not rolledback: 'Editable'."
class ExecuteSuccessfulOperationsResource(Resource):
def get(self):
with app.app_context():
tx = g.get('tx')
cypher: """
MATCH (e: Editable) SET e.field_test = 'commited edition' return e.field_test
"""
tx.run(cypher) # Error line
cypher = """
MATCH (d: Deletable) DETACH delete d
"""
tx.run(cypher)
return 'Ok'
api.add_resource(ExecuteSuccessfulOperationsResource, '/execute_successful_operations/')
class ExecuteFailedOperationsResource(Resource):
def get(self):
with app.app_context():
tx = g.get('tx')
cypher: """
MATCH (e: Editable) SET e.field_test = 'rolledback edition' return e.field_test
"""
tx.run(cypher)
cypher = """
This must generate an exception so the request is aborted and a transaction rollback must be executed.
"""
tx.run(cypher)
return 'Fail'
api.add_resource(ExecuteFailedOperationsResource, '/execute_failed_operations/')
Aucun commentaire:
Enregistrer un commentaire