@@ -9,9 +9,10 @@ use crate::{
99 value_converter:: { convert_parameters, PythonDTO } ,
1010} ;
1111use deadpool_postgres:: Object ;
12+ use futures_util:: future;
1213use pyo3:: {
1314 pyclass, pymethods,
14- types:: { PyList , PyString } ,
15+ types:: { PyList , PyString , PyTuple } ,
1516 Py , PyAny , PyErr , PyObject , PyRef , PyRefMut , Python ,
1617} ;
1718use std:: { collections:: HashSet , sync:: Arc , vec} ;
@@ -163,6 +164,7 @@ impl RustTransaction {
163164
164165 Ok ( ( ) )
165166 }
167+
166168 /// Fetch single row from query.
167169 ///
168170 /// Method doesn't acquire lock on any structure fields.
@@ -213,6 +215,29 @@ impl RustTransaction {
213215
214216 Ok ( PSQLDriverSinglePyQueryResult :: new ( result) )
215217 }
218+
219+ /// Run many queries as pipeline.
220+ ///
221+ /// It can boost up querying speed.
222+ ///
223+ /// # Errors
224+ ///
225+ /// May return Err Result if can't join futures or cannot execute
226+ /// any of queries.
227+ pub async fn inner_pipeline (
228+ & self ,
229+ queries : Vec < ( String , Vec < PythonDTO > ) > ,
230+ ) -> RustPSQLDriverPyResult < Vec < PSQLDriverPyQueryResult > > {
231+ let mut futures = vec ! [ ] ;
232+ for ( querystring, params) in queries {
233+ let execute_future = self . inner_execute ( querystring, params) ;
234+ futures. push ( execute_future) ;
235+ }
236+
237+ let b = future:: try_join_all ( futures) . await ?;
238+ Ok ( b)
239+ }
240+
216241 /// Start transaction
217242 /// Set up isolation level if specified
218243 /// Set up deferable if specified
@@ -748,6 +773,48 @@ impl Transaction {
748773 } )
749774 }
750775
776+ /// Execute querystrings with parameters and return all results.
777+ ///
778+ /// Create pipeline of queries.
779+ ///
780+ /// # Errors
781+ ///
782+ /// May return Err Result if:
783+ /// 1) Cannot convert python parameters
784+ /// 2) Cannot execute any of querystring.
785+ pub fn pipeline < ' a > (
786+ & ' a self ,
787+ py : Python < ' a > ,
788+ queries : Option < & ' a PyList > ,
789+ ) -> RustPSQLDriverPyResult < & ' a PyAny > {
790+ let mut processed_queries: Vec < ( String , Vec < PythonDTO > ) > = vec ! [ ] ;
791+ if let Some ( queries) = queries {
792+ for single_query in queries {
793+ let query_tuple = single_query. downcast :: < PyTuple > ( ) . map_err ( |err| {
794+ RustPSQLDriverError :: PyToRustValueConversionError ( format ! (
795+ "Cannot cast to tuple: {err}" ,
796+ ) )
797+ } ) ?;
798+ let querystring = query_tuple. get_item ( 0 ) ?. extract :: < String > ( ) ?;
799+ match query_tuple. get_item ( 1 ) {
800+ Ok ( params) => {
801+ processed_queries. push ( ( querystring, convert_parameters ( params) ?) ) ;
802+ }
803+ Err ( _) => {
804+ processed_queries. push ( ( querystring, vec ! [ ] ) ) ;
805+ }
806+ }
807+ }
808+ }
809+
810+ let transaction_arc = self . transaction . clone ( ) ;
811+
812+ rustengine_future ( py, async move {
813+ let transaction_guard = transaction_arc. read ( ) . await ;
814+ transaction_guard. inner_pipeline ( processed_queries) . await
815+ } )
816+ }
817+
751818 /// Start the transaction.
752819 ///
753820 /// # Errors
0 commit comments