Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file ocsipersist.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513(** PostgreSQL (>= 9.5) backend for Ocsipersist. *)letsection=Lwt_log.Section.make"ocsigen:ocsipersist:pgsql"moduleLwt_thread=structincludeLwtletclose_in=Lwt_io.closeletreally_input=Lwt_io.read_into_exactlyletinput_binary_int=Lwt_io.BE.read_intletinput_char=Lwt_io.read_charletoutput_string=Lwt_io.writeletoutput_binary_int=Lwt_io.BE.write_intletoutput_char=Lwt_io.write_charletflush=Lwt_io.flushletopen_connectionx=Lwt_io.open_connectionxtypeout_channel=Lwt_io.output_channeltypein_channel=Lwt_io.input_channelendmodulePGOCaml=PGOCaml_generic.Make(Lwt_thread)openLwt.InfixopenPrintfexceptionOcsipersist_errorlethost=refNoneletport=refNoneletuser=refNoneletpassword=refNoneletdatabase=ref"ocsipersist"letunix_domain_socket_dir=refNoneletsize_conn_pool=ref16letmake_hashtbl()=Hashtbl.create8letconnect()=PGOCaml.connect?host:!host?port:!port?user:!user?password:!password?database:(Some!database)?unix_domain_socket_dir:!unix_domain_socket_dir()>>=fundbhandle->PGOCaml.set_private_datadbhandle@@make_hashtbl();Lwt.returndbhandlelet(>>)fg=f>>=fun_->gletdisposedb=Lwt.catch(fun()->PGOCaml.closedb)(fun_->Lwt.return_unit)letconn_pool:(string,unit)Hashtbl.tPGOCaml.tLwt_pool.tref=(* This connection pool will be overwritten by init_fun! *)ref(Lwt_pool.create!size_conn_pool~validate:PGOCaml.alive~disposeconnect)letuse_poolf=Lwt_pool.use!conn_pool@@fundb->Lwt.catch(fun()->fdb)(function|PGOCaml.Errormsgase->Lwt_log.ign_error_f~section"postgresql protocol error: %s"msg;PGOCaml.closedb>>=fun()->Lwt.faile|Lwt.Canceledase->Lwt_log.ign_error~section"thread canceled";PGOCaml.closedb>>=fun()->Lwt.faile|e->Lwt.faile)(* escapes characters that are not in the range of 0x20..0x7e;
this is to meet PostgreSQL's format requirements for text fields
while keeping the key column readable whenever possible. *)letescape_strings=letlen=String.lengthsinletbuf=Buffer.create(len*2)infori=0tolen-1doletc=s.[i]inletcc=Char.codecinifcc<0x20||cc>0x7ethenBuffer.add_stringbuf(sprintf"\\%03o"cc)(* non-print -> \ooo *)elseifc='\\'thenBuffer.add_stringbuf"\\\\"(* \ -> \\ *)elseBuffer.add_charbufcdone;Buffer.contentsbufletunescape_stringstr=letis_first_oct_digitc=c>='0'&&c<='3'andis_oct_digitc=c>='0'&&c<='7'andoct_valc=Char.codec-0x30inletlen=String.lengthstrinletbuf=Buffer.createleninleti=ref0inwhile!i<lendoletc=str.[!i]inifc='\\'then(incri;if!i<len&&str.[!i]='\\'then(Buffer.add_charbuf'\\';incri)elseif!i+2<len&&is_first_oct_digitstr.[!i]&&is_oct_digitstr.[!i+1]&&is_oct_digitstr.[!i+2]then(letbyte=oct_valstr.[!i]inincri;letbyte=(bytelsl3)+oct_valstr.[!i]inincri;letbyte=(bytelsl3)+oct_valstr.[!i]inincri;Buffer.add_charbuf(Char.chrbyte)))else(incri;Buffer.add_charbufc)done;Buffer.contentsbuftype'aparameter=Keyofstring|Valueof'aletpack=function|Keyk->escape_stringk|Valuev->PGOCaml.string_of_bytea@@Marshal.to_stringv[]letunpack_key=unescape_stringletunpack_valuevalue=Marshal.from_string(PGOCaml.bytea_of_stringvalue)0letkey_value_of_row=function|[Somekey;Somevalue]->unpack_keykey,unpack_valuevalue|_->raiseOcsipersist_errorletreclist_lastl=matchlwith|[x]->x|_::r->list_lastr|[]->raiseNot_found(* get one value from the result of a query *)letone_value=function|[Somevalue]::_xs->unpack_valuevalue|_->raiseNot_foundletpreparedbquery=lethashtbl=PGOCaml.private_datadbin(* Get a unique name for this query using an MD5 digest. *)letname=Digest.to_hex(Digest.stringquery)in(* Have we prepared this statement already? If not, do so. *)letis_prepared=Hashtbl.memhashtblnameinbeginifis_preparedthenLwt.return()elsebeginPGOCaml.preparedb~name~query()>>Lwt.return@@Hashtbl.addhashtblname()endend>>=fun()->Lwt.returnnameletexecdbqueryparams=preparedbquery>>=funname->letparams=List.map(funx->Some(packx))paramsinPGOCaml.executedb~name~params()letcreate_tabledbtable=letquery=sprintf"CREATE TABLE IF NOT EXISTS %s \
(key TEXT, value BYTEA, PRIMARY KEY(key))"tableinexecdbquery[]>>Lwt.return()typestore=stringtype'at={store:string;name:string;}letopen_storestore=use_pool@@fundb->create_tabledbstore>>Lwt.returnstoreletmake_persistent_worker~store~name~defaultdb=letquery=sprintf"INSERT INTO %s VALUES ( $1 , $2 )
ON CONFLICT ( key ) DO NOTHING"storein(* NOTE: incompatible with < 9.5 *)execdbquery[Keyname;Valuedefault]>>Lwt.return{store;name}letmake_persistent~store~name~default=use_pool@@fundb->make_persistent_worker~store~name~defaultdbletmake_persistent_lazy_lwt~store~name~default=use_pool@@fundb->letquery=sprintf"SELECT 1 FROM %s WHERE key = $1 "storeinexecdbquery[Keyname]>>=function|[]->default()>>=fundefault->make_persistent_worker~store~name~defaultdb|_->Lwt.return{store=store;name=name}letmake_persistent_lazy~store~name~default=letdefault()=Lwt.wrapdefaultinmake_persistent_lazy_lwt~store~name~defaultletgetp=use_pool@@fundb->letquery=sprintf"SELECT value FROM %s WHERE key = $1 "p.storeinLwt.mapone_value(execdbquery[Keyp.name])letsetpv=use_pool@@fundb->letquery=sprintf"UPDATE %s SET value = $2 WHERE key = $1 "p.storeinexecdbquery[Keyp.name;Valuev]>>Lwt.return()(* FUNCTORIAL INTERFACE *******************************************************)moduletypeTABLE_CONF=sigvalname:stringendtypeinternal=stringmoduletypeCOLUMN=sigtypetvalcolumn_type:stringvalencode:t->internalvaldecode:internal->tendmoduletypeTABLE=sigtypekeytypevaluevalname:stringvalfind:key->valueLwt.tvaladd:key->value->unitLwt.tvalreplace_if_exists:key->value->unitLwt.tvalremove:key->unitLwt.tvalmodify_opt:key->(valueoption->valueoption)->unitLwt.tvallength:unit->intLwt.tvaliter:?count:int64->?gt:key->?geq:key->?lt:key->?leq:key->(key->value->unitLwt.t)->unitLwt.tvalfold:?count:int64->?gt:key->?geq:key->?lt:key->?leq:key->(key->value->'a->'aLwt.t)->'a->'aLwt.tvaliter_block:?count:int64->?gt:key->?geq:key->?lt:key->?leq:key->(key->value->unit)->unitLwt.tendmoduleTable(T:TABLE_CONF)(Key:COLUMN)(Value:COLUMN):TABLEwithtypekey=Key.tandtypevalue=Value.t=structtypekey=Key.ttypevalue=Value.tletname=T.namemoduleAux=structletexec_optdbqueryparams=preparedbquery>>=funname->PGOCaml.executedb~name~params()letexecdbqueryparams=preparedbquery>>=funname->letparams=List.map(funx->Somex)paramsinPGOCaml.executedb~name~params()letexec_dbqueryparams=execdbqueryparams>>Lwt.return_unitletencode_pairkeyvalue=[Key.encodekey;Value.encodevalue]endletinit=letcreate_tabletabledb=letquery=sprintf"CREATE TABLE IF NOT EXISTS %s \
(key %s, value %s, PRIMARY KEY (key))"tableKey.column_typeValue.column_typeinAux.exec_dbquery[]inlazy(use_pool@@create_tableT.name)letwith_tablef=Lazy.forceinit>>=fun()->use_poolfletfindkey=with_table@@fundb->letquery=sprintf"SELECT value FROM %s WHERE key = $1 "nameinAux.execdbquery[Key.encodekey]>>=function|[Somevalue]::_->Lwt.return(Value.decodevalue)|_->Lwt.failNot_foundletaddkeyvalue=with_table@@fundb->letquery=sprintf"INSERT INTO %s VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET value = $2"nameinAux.exec_dbquery@@Aux.encode_pairkeyvalueletreplace_if_existskeyvalue=with_table@@fundb->letquery=sprintf"UPDATE %s SET value = $2 WHERE key = $1 RETURNING 0"nameinAux.execdbquery(Aux.encode_pairkeyvalue)>>=function|[]->raiseNot_found|_->Lwt.return_unitletremovekey=with_table@@fundb->letquery=sprintf"DELETE FROM %s WHERE key = $1"nameinAux.exec_dbquery[Key.encodekey]letmodify_optkeyf=with_table@@fundb->letquery=sprintf"SELECT value FROM %s WHERE key = $1"nameinAux.execdbquery[Key.encodekey]>>=funvalue->letold_value=matchvaluewith|[Somev]::_->Some(Value.decodev)|_->Noneinmatchfold_valuewith|Somenew_value->letquery=sprintf"INSERT INTO %s VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET value = $2"nameinAux.exec_dbquery@@Aux.encode_pairkeynew_value|None->letquery=sprintf"DELETE FROM %s WHERE key = $1"nameinAux.exec_dbquery[Key.encodekey]letlength()=with_table@@fundb->letquery=sprintf"SELECT count (1) FROM %s"nameinLwt.mapone_value@@Aux.execdbquery[]letmax_iter_block_size=1000Lletreciter_rec?count?gt?geq?lt?leqflast=matchcountwithSomecwhenc<=0L->Lwt.return_unit|_->letkey_value_of_row=function|[Somekey;Somevalue]->Key.decodekey,Value.decodevalue|_->raiseOcsipersist_errorinletquery=sprintf"SELECT * FROM %s
WHERE ($1 :: %s IS NULL OR key > $1)
AND ($2 :: %s IS NULL OR key > $2)
AND ($3 :: %s IS NULL OR key >= $3)
AND ($4 :: %s IS NULL OR key < $4)
AND ($5 :: %s IS NULL OR key <= $5)
ORDER BY key LIMIT $6"nameKey.column_typeKey.column_typeKey.column_typeKey.column_typeKey.column_typeandargs=letcount=matchcountwith|Somecwhenc<=max_iter_block_size->c|_->max_iter_block_sizein[Option.mapKey.encodelast;Option.mapKey.encodegt;Option.mapKey.encodegeq;Option.mapKey.encodelt;Option.mapKey.encodeleq;Some(Int64.to_stringcount)]inwith_table(fundb->Aux.exec_optdbqueryargs)>>=funl->Lwt_list.iter_s(funrow->letk,v=key_value_of_rowrowinfkv)l>>=fun()->ifl=[]thenLwt.return_unitelseletlast,_=key_value_of_row@@list_lastlinletcount=Option.mapInt64.(func->subc@@of_int@@List.lengthl)countiniter_rec?countf?gt?geq?lt?leq(Somelast)letiter?count?gt?geq?lt?leqf=iter_rec?count?gt?geq?lt?leqfNoneletfold?count?gt?geq?lt?leqfx=letres=refxinletgkeyvalue=fkeyvalue!res>>=funres'->res:=res';Lwt.return_unitiniter?count?gt?geq?lt?leqg>>Lwt.return!resletiter_block?count:_?gt:_?geq:_?lt:_?leq:__=failwith"Ocsipersist.iter_block: not implemented"endmoduleColumn=structmoduleString:COLUMNwithtypet=string=structtypet=stringletcolumn_type="text"letencode=escape_stringletdecode=unescape_stringendmoduleFloat:COLUMNwithtypet=float=structtypet=floatletcolumn_type="float"letencode=PGOCaml.string_of_floatletdecode=PGOCaml.float_of_stringendmoduleMarshal(C:sigtypetend):COLUMNwithtypet=C.t=structtypet=C.tletcolumn_type="bytea"letencodev=PGOCaml.string_of_bytea@@Marshal.to_stringv[]letdecodev=Marshal.from_string(PGOCaml.bytea_of_stringv)0endend(******************************************************************************)type'valuetable=stringlettable_nametable=Lwt.returntableletexisting_tables=Hashtbl.create16letopen_tabletable=ifHashtbl.memexisting_tablestablethenLwt.returntableelsebeginuse_pool@@fundb->create_tabledbtable>>=fun()->Hashtbl.addexisting_tablestable();Lwt.returntableendletfindtablekey=use_pool@@fundb->letquery=sprintf"SELECT value FROM %s WHERE key = $1 "tableinLwt.mapone_value(execdbquery[Keykey])letaddtablekeyvalue=use_pool@@fundb->letquery=sprintf"INSERT INTO %s VALUES ( $1 , $2 )
ON CONFLICT ( key ) DO UPDATE SET value = $2 "table(* NOTE: incompatible with < 9.5 *)inexecdbquery[Keykey;Valuevalue]>>Lwt.return()letreplace_if_existstablekeyvalue=use_pool@@fundb->letquery=sprintf"UPDATE %s SET value = $2 WHERE key = $1 RETURNING 0"tableinexecdbquery[Keykey;Valuevalue]>>=function|[]->raiseNot_found|_->Lwt.return()letremovetablekey=use_pool@@fundb->letquery=sprintf"DELETE FROM %s WHERE key = $1 "tableinexecdbquery[Keykey]>>Lwt.return()letlengthtable=use_pool@@fundb->letquery=sprintf"SELECT count(*) FROM %s "tableinLwt.mapone_value(execdbquery[])letreciter_recftablelast=let(query,args)=matchlastwith|None->(sprintf"SELECT * FROM %s ORDER BY key LIMIT 1000"table,[])|Somelast->(sprintf"SELECT * FROM %s WHERE key > $1 ORDER BY key LIMIT 1000"table,[Keylast])in(use_pool@@fundb->execdbqueryargs)>>=funl->Lwt_list.iter_s(funrow->letkey,value=key_value_of_rowrowinfkeyvalue)l>>=fun()->ifl=[]thenLwt.return_unitelseletlast,_=key_value_of_row(list_lastl)initer_recftable(Somelast)letiter_stepftable=iter_recftableNoneletiter_table=iter_stepletfold_stepftablex=letres=refxinletgkeyvalue=fkeyvalue!res>>=funres'->res:=res';Lwt.return()initer_stepgtable>>Lwt.return!resletfold_table=fold_stepletiter_block_a_b=failwith"Ocsipersist.iter_block: not implemented"letparse_global_config=function|[]->()|[Xml.Element("database",attrs,[])]->letparse_attr=function|("host",h)->host:=Someh|("port",p)->begintryport:=Some(int_of_stringp)withFailure_->raise@@Ocsigen_extensions.Error_in_config_file"port is not an integer"end|("user",u)->user:=Someu|("password",pw)->password:=Somepw|("database",db)->database:=db|("unix_domain_socket_dir",udsd)->unix_domain_socket_dir:=Someudsd|("size_conn_pool",scp)->begintrysize_conn_pool:=int_of_stringscpwithFailure_->raise@@Ocsigen_extensions.Error_in_config_file"size_conn_pool is not an integer"end|_->raise@@Ocsigen_extensions.Error_in_config_file"Unexpected attribute for <database> in Ocsipersist config"inignore@@List.mapparse_attrattrs;()|_->raise@@Ocsigen_extensions.Error_in_config_file"Unexpected content inside Ocsipersist config"letinit_funconfig=parse_global_configconfig;conn_pool:=Lwt_pool.create!size_conn_pool~validate:PGOCaml.aliveconnectlet_=Ocsigen_extensions.register~name:"ocsipersist"~init_fun()