123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634 |
- import os
- import socket
- from sys import platform
- from functools import wraps, partial
- from itertools import count, chain
- from weakref import WeakValueDictionary
- from errno import errorcode
- from OpenSSL._util import (
- UNSPECIFIED as _UNSPECIFIED,
- exception_from_error_queue as _exception_from_error_queue,
- ffi as _ffi,
- lib as _lib,
- make_assert as _make_assert,
- path_bytes as _path_bytes,
- text_to_bytes_and_warn as _text_to_bytes_and_warn,
- no_zero_allocator as _no_zero_allocator,
- )
- from OpenSSL.crypto import (
- FILETYPE_PEM,
- _PassphraseHelper,
- PKey,
- X509Name,
- X509,
- X509Store,
- )
- __all__ = [
- "OPENSSL_VERSION_NUMBER",
- "SSLEAY_VERSION",
- "SSLEAY_CFLAGS",
- "SSLEAY_PLATFORM",
- "SSLEAY_DIR",
- "SSLEAY_BUILT_ON",
- "SENT_SHUTDOWN",
- "RECEIVED_SHUTDOWN",
- "SSLv2_METHOD",
- "SSLv3_METHOD",
- "SSLv23_METHOD",
- "TLSv1_METHOD",
- "TLSv1_1_METHOD",
- "TLSv1_2_METHOD",
- "TLS_METHOD",
- "TLS_SERVER_METHOD",
- "TLS_CLIENT_METHOD",
- "DTLS_METHOD",
- "DTLS_SERVER_METHOD",
- "DTLS_CLIENT_METHOD",
- "SSL3_VERSION",
- "TLS1_VERSION",
- "TLS1_1_VERSION",
- "TLS1_2_VERSION",
- "TLS1_3_VERSION",
- "OP_NO_SSLv2",
- "OP_NO_SSLv3",
- "OP_NO_TLSv1",
- "OP_NO_TLSv1_1",
- "OP_NO_TLSv1_2",
- "OP_NO_TLSv1_3",
- "MODE_RELEASE_BUFFERS",
- "OP_SINGLE_DH_USE",
- "OP_SINGLE_ECDH_USE",
- "OP_EPHEMERAL_RSA",
- "OP_MICROSOFT_SESS_ID_BUG",
- "OP_NETSCAPE_CHALLENGE_BUG",
- "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG",
- "OP_SSLREF2_REUSE_CERT_TYPE_BUG",
- "OP_MICROSOFT_BIG_SSLV3_BUFFER",
- "OP_MSIE_SSLV2_RSA_PADDING",
- "OP_SSLEAY_080_CLIENT_DH_BUG",
- "OP_TLS_D5_BUG",
- "OP_TLS_BLOCK_PADDING_BUG",
- "OP_DONT_INSERT_EMPTY_FRAGMENTS",
- "OP_CIPHER_SERVER_PREFERENCE",
- "OP_TLS_ROLLBACK_BUG",
- "OP_PKCS1_CHECK_1",
- "OP_PKCS1_CHECK_2",
- "OP_NETSCAPE_CA_DN_BUG",
- "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG",
- "OP_NO_COMPRESSION",
- "OP_NO_QUERY_MTU",
- "OP_COOKIE_EXCHANGE",
- "OP_NO_TICKET",
- "OP_NO_RENEGOTIATION",
- "OP_ALL",
- "VERIFY_PEER",
- "VERIFY_FAIL_IF_NO_PEER_CERT",
- "VERIFY_CLIENT_ONCE",
- "VERIFY_NONE",
- "SESS_CACHE_OFF",
- "SESS_CACHE_CLIENT",
- "SESS_CACHE_SERVER",
- "SESS_CACHE_BOTH",
- "SESS_CACHE_NO_AUTO_CLEAR",
- "SESS_CACHE_NO_INTERNAL_LOOKUP",
- "SESS_CACHE_NO_INTERNAL_STORE",
- "SESS_CACHE_NO_INTERNAL",
- "SSL_ST_CONNECT",
- "SSL_ST_ACCEPT",
- "SSL_ST_MASK",
- "SSL_CB_LOOP",
- "SSL_CB_EXIT",
- "SSL_CB_READ",
- "SSL_CB_WRITE",
- "SSL_CB_ALERT",
- "SSL_CB_READ_ALERT",
- "SSL_CB_WRITE_ALERT",
- "SSL_CB_ACCEPT_LOOP",
- "SSL_CB_ACCEPT_EXIT",
- "SSL_CB_CONNECT_LOOP",
- "SSL_CB_CONNECT_EXIT",
- "SSL_CB_HANDSHAKE_START",
- "SSL_CB_HANDSHAKE_DONE",
- "Error",
- "WantReadError",
- "WantWriteError",
- "WantX509LookupError",
- "ZeroReturnError",
- "SysCallError",
- "NO_OVERLAPPING_PROTOCOLS",
- "SSLeay_version",
- "Session",
- "Context",
- "Connection",
- ]
- OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER
- SSLEAY_VERSION = _lib.SSLEAY_VERSION
- SSLEAY_CFLAGS = _lib.SSLEAY_CFLAGS
- SSLEAY_PLATFORM = _lib.SSLEAY_PLATFORM
- SSLEAY_DIR = _lib.SSLEAY_DIR
- SSLEAY_BUILT_ON = _lib.SSLEAY_BUILT_ON
- SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN
- RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN
- SSLv2_METHOD = 1
- SSLv3_METHOD = 2
- SSLv23_METHOD = 3
- TLSv1_METHOD = 4
- TLSv1_1_METHOD = 5
- TLSv1_2_METHOD = 6
- TLS_METHOD = 7
- TLS_SERVER_METHOD = 8
- TLS_CLIENT_METHOD = 9
- DTLS_METHOD = 10
- DTLS_SERVER_METHOD = 11
- DTLS_CLIENT_METHOD = 12
- try:
- SSL3_VERSION = _lib.SSL3_VERSION
- TLS1_VERSION = _lib.TLS1_VERSION
- TLS1_1_VERSION = _lib.TLS1_1_VERSION
- TLS1_2_VERSION = _lib.TLS1_2_VERSION
- TLS1_3_VERSION = _lib.TLS1_3_VERSION
- except AttributeError:
- # Hardcode constants for cryptography < 3.4, see
- # https://github.com/pyca/pyopenssl/pull/985#issuecomment-775186682
- SSL3_VERSION = 768
- TLS1_VERSION = 769
- TLS1_1_VERSION = 770
- TLS1_2_VERSION = 771
- TLS1_3_VERSION = 772
- OP_NO_SSLv2 = _lib.SSL_OP_NO_SSLv2
- OP_NO_SSLv3 = _lib.SSL_OP_NO_SSLv3
- OP_NO_TLSv1 = _lib.SSL_OP_NO_TLSv1
- OP_NO_TLSv1_1 = _lib.SSL_OP_NO_TLSv1_1
- OP_NO_TLSv1_2 = _lib.SSL_OP_NO_TLSv1_2
- try:
- OP_NO_TLSv1_3 = _lib.SSL_OP_NO_TLSv1_3
- except AttributeError:
- pass
- MODE_RELEASE_BUFFERS = _lib.SSL_MODE_RELEASE_BUFFERS
- OP_SINGLE_DH_USE = _lib.SSL_OP_SINGLE_DH_USE
- OP_SINGLE_ECDH_USE = _lib.SSL_OP_SINGLE_ECDH_USE
- OP_EPHEMERAL_RSA = _lib.SSL_OP_EPHEMERAL_RSA
- OP_MICROSOFT_SESS_ID_BUG = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG
- OP_NETSCAPE_CHALLENGE_BUG = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG
- OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG = (
- _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG
- )
- OP_SSLREF2_REUSE_CERT_TYPE_BUG = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG
- OP_MICROSOFT_BIG_SSLV3_BUFFER = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
- OP_MSIE_SSLV2_RSA_PADDING = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING
- OP_SSLEAY_080_CLIENT_DH_BUG = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG
- OP_TLS_D5_BUG = _lib.SSL_OP_TLS_D5_BUG
- OP_TLS_BLOCK_PADDING_BUG = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG
- OP_DONT_INSERT_EMPTY_FRAGMENTS = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS
- OP_CIPHER_SERVER_PREFERENCE = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE
- OP_TLS_ROLLBACK_BUG = _lib.SSL_OP_TLS_ROLLBACK_BUG
- OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1
- OP_PKCS1_CHECK_2 = _lib.SSL_OP_PKCS1_CHECK_2
- OP_NETSCAPE_CA_DN_BUG = _lib.SSL_OP_NETSCAPE_CA_DN_BUG
- OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG = (
- _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG
- )
- OP_NO_COMPRESSION = _lib.SSL_OP_NO_COMPRESSION
- OP_NO_QUERY_MTU = _lib.SSL_OP_NO_QUERY_MTU
- OP_COOKIE_EXCHANGE = _lib.SSL_OP_COOKIE_EXCHANGE
- OP_NO_TICKET = _lib.SSL_OP_NO_TICKET
- try:
- OP_NO_RENEGOTIATION = _lib.SSL_OP_NO_RENEGOTIATION
- except AttributeError:
- pass
- OP_ALL = _lib.SSL_OP_ALL
- VERIFY_PEER = _lib.SSL_VERIFY_PEER
- VERIFY_FAIL_IF_NO_PEER_CERT = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT
- VERIFY_CLIENT_ONCE = _lib.SSL_VERIFY_CLIENT_ONCE
- VERIFY_NONE = _lib.SSL_VERIFY_NONE
- SESS_CACHE_OFF = _lib.SSL_SESS_CACHE_OFF
- SESS_CACHE_CLIENT = _lib.SSL_SESS_CACHE_CLIENT
- SESS_CACHE_SERVER = _lib.SSL_SESS_CACHE_SERVER
- SESS_CACHE_BOTH = _lib.SSL_SESS_CACHE_BOTH
- SESS_CACHE_NO_AUTO_CLEAR = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR
- SESS_CACHE_NO_INTERNAL_LOOKUP = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP
- SESS_CACHE_NO_INTERNAL_STORE = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE
- SESS_CACHE_NO_INTERNAL = _lib.SSL_SESS_CACHE_NO_INTERNAL
- SSL_ST_CONNECT = _lib.SSL_ST_CONNECT
- SSL_ST_ACCEPT = _lib.SSL_ST_ACCEPT
- SSL_ST_MASK = _lib.SSL_ST_MASK
- SSL_CB_LOOP = _lib.SSL_CB_LOOP
- SSL_CB_EXIT = _lib.SSL_CB_EXIT
- SSL_CB_READ = _lib.SSL_CB_READ
- SSL_CB_WRITE = _lib.SSL_CB_WRITE
- SSL_CB_ALERT = _lib.SSL_CB_ALERT
- SSL_CB_READ_ALERT = _lib.SSL_CB_READ_ALERT
- SSL_CB_WRITE_ALERT = _lib.SSL_CB_WRITE_ALERT
- SSL_CB_ACCEPT_LOOP = _lib.SSL_CB_ACCEPT_LOOP
- SSL_CB_ACCEPT_EXIT = _lib.SSL_CB_ACCEPT_EXIT
- SSL_CB_CONNECT_LOOP = _lib.SSL_CB_CONNECT_LOOP
- SSL_CB_CONNECT_EXIT = _lib.SSL_CB_CONNECT_EXIT
- SSL_CB_HANDSHAKE_START = _lib.SSL_CB_HANDSHAKE_START
- SSL_CB_HANDSHAKE_DONE = _lib.SSL_CB_HANDSHAKE_DONE
- # Taken from https://golang.org/src/crypto/x509/root_linux.go
- _CERTIFICATE_FILE_LOCATIONS = [
- "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc.
- "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 6
- "/etc/ssl/ca-bundle.pem", # OpenSUSE
- "/etc/pki/tls/cacert.pem", # OpenELEC
- "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", # CentOS/RHEL 7
- ]
- _CERTIFICATE_PATH_LOCATIONS = [
- "/etc/ssl/certs", # SLES10/SLES11
- ]
- # These values are compared to output from cffi's ffi.string so they must be
- # byte strings.
- _CRYPTOGRAPHY_MANYLINUX_CA_DIR = b"/opt/pyca/cryptography/openssl/certs"
- _CRYPTOGRAPHY_MANYLINUX_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem"
- class Error(Exception):
- """
- An error occurred in an `OpenSSL.SSL` API.
- """
- _raise_current_error = partial(_exception_from_error_queue, Error)
- _openssl_assert = _make_assert(Error)
- class WantReadError(Error):
- pass
- class WantWriteError(Error):
- pass
- class WantX509LookupError(Error):
- pass
- class ZeroReturnError(Error):
- pass
- class SysCallError(Error):
- pass
- class _CallbackExceptionHelper(object):
- """
- A base class for wrapper classes that allow for intelligent exception
- handling in OpenSSL callbacks.
- :ivar list _problems: Any exceptions that occurred while executing in a
- context where they could not be raised in the normal way. Typically
- this is because OpenSSL has called into some Python code and requires a
- return value. The exceptions are saved to be raised later when it is
- possible to do so.
- """
- def __init__(self):
- self._problems = []
- def raise_if_problem(self):
- """
- Raise an exception from the OpenSSL error queue or that was previously
- captured whe running a callback.
- """
- if self._problems:
- try:
- _raise_current_error()
- except Error:
- pass
- raise self._problems.pop(0)
- class _VerifyHelper(_CallbackExceptionHelper):
- """
- Wrap a callback such that it can be used as a certificate verification
- callback.
- """
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
- @wraps(callback)
- def wrapper(ok, store_ctx):
- x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
- _lib.X509_up_ref(x509)
- cert = X509._from_raw_x509_ptr(x509)
- error_number = _lib.X509_STORE_CTX_get_error(store_ctx)
- error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx)
- index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx()
- ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index)
- connection = Connection._reverse_mapping[ssl]
- try:
- result = callback(
- connection, cert, error_number, error_depth, ok
- )
- except Exception as e:
- self._problems.append(e)
- return 0
- else:
- if result:
- _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK)
- return 1
- else:
- return 0
- self.callback = _ffi.callback(
- "int (*)(int, X509_STORE_CTX *)", wrapper
- )
- NO_OVERLAPPING_PROTOCOLS = object()
- class _ALPNSelectHelper(_CallbackExceptionHelper):
- """
- Wrap a callback such that it can be used as an ALPN selection callback.
- """
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
- @wraps(callback)
- def wrapper(ssl, out, outlen, in_, inlen, arg):
- try:
- conn = Connection._reverse_mapping[ssl]
- # The string passed to us is made up of multiple
- # length-prefixed bytestrings. We need to split that into a
- # list.
- instr = _ffi.buffer(in_, inlen)[:]
- protolist = []
- while instr:
- encoded_len = instr[0]
- proto = instr[1 : encoded_len + 1]
- protolist.append(proto)
- instr = instr[encoded_len + 1 :]
- # Call the callback
- outbytes = callback(conn, protolist)
- any_accepted = True
- if outbytes is NO_OVERLAPPING_PROTOCOLS:
- outbytes = b""
- any_accepted = False
- elif not isinstance(outbytes, bytes):
- raise TypeError(
- "ALPN callback must return a bytestring or the "
- "special NO_OVERLAPPING_PROTOCOLS sentinel value."
- )
- # Save our callback arguments on the connection object to make
- # sure that they don't get freed before OpenSSL can use them.
- # Then, return them in the appropriate output parameters.
- conn._alpn_select_callback_args = [
- _ffi.new("unsigned char *", len(outbytes)),
- _ffi.new("unsigned char[]", outbytes),
- ]
- outlen[0] = conn._alpn_select_callback_args[0][0]
- out[0] = conn._alpn_select_callback_args[1]
- if not any_accepted:
- return _lib.SSL_TLSEXT_ERR_NOACK
- return _lib.SSL_TLSEXT_ERR_OK
- except Exception as e:
- self._problems.append(e)
- return _lib.SSL_TLSEXT_ERR_ALERT_FATAL
- self.callback = _ffi.callback(
- (
- "int (*)(SSL *, unsigned char **, unsigned char *, "
- "const unsigned char *, unsigned int, void *)"
- ),
- wrapper,
- )
- class _OCSPServerCallbackHelper(_CallbackExceptionHelper):
- """
- Wrap a callback such that it can be used as an OCSP callback for the server
- side.
- Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
- ways. For servers, that callback is expected to retrieve some OCSP data and
- hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
- SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
- is expected to check the OCSP data, and returns a negative value on error,
- 0 if the response is not acceptable, or positive if it is. These are
- mutually exclusive return code behaviours, and they mean that we need two
- helpers so that we always return an appropriate error code if the user's
- code throws an exception.
- Given that we have to have two helpers anyway, these helpers are a bit more
- helpery than most: specifically, they hide a few more of the OpenSSL
- functions so that the user has an easier time writing these callbacks.
- This helper implements the server side.
- """
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
- @wraps(callback)
- def wrapper(ssl, cdata):
- try:
- conn = Connection._reverse_mapping[ssl]
- # Extract the data if any was provided.
- if cdata != _ffi.NULL:
- data = _ffi.from_handle(cdata)
- else:
- data = None
- # Call the callback.
- ocsp_data = callback(conn, data)
- if not isinstance(ocsp_data, bytes):
- raise TypeError("OCSP callback must return a bytestring.")
- # If the OCSP data was provided, we will pass it to OpenSSL.
- # However, we have an early exit here: if no OCSP data was
- # provided we will just exit out and tell OpenSSL that there
- # is nothing to do.
- if not ocsp_data:
- return 3 # SSL_TLSEXT_ERR_NOACK
- # OpenSSL takes ownership of this data and expects it to have
- # been allocated by OPENSSL_malloc.
- ocsp_data_length = len(ocsp_data)
- data_ptr = _lib.OPENSSL_malloc(ocsp_data_length)
- _ffi.buffer(data_ptr, ocsp_data_length)[:] = ocsp_data
- _lib.SSL_set_tlsext_status_ocsp_resp(
- ssl, data_ptr, ocsp_data_length
- )
- return 0
- except Exception as e:
- self._problems.append(e)
- return 2 # SSL_TLSEXT_ERR_ALERT_FATAL
- self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
- class _OCSPClientCallbackHelper(_CallbackExceptionHelper):
- """
- Wrap a callback such that it can be used as an OCSP callback for the client
- side.
- Annoyingly, OpenSSL defines one OCSP callback but uses it in two different
- ways. For servers, that callback is expected to retrieve some OCSP data and
- hand it to OpenSSL, and may return only SSL_TLSEXT_ERR_OK,
- SSL_TLSEXT_ERR_FATAL, and SSL_TLSEXT_ERR_NOACK. For clients, that callback
- is expected to check the OCSP data, and returns a negative value on error,
- 0 if the response is not acceptable, or positive if it is. These are
- mutually exclusive return code behaviours, and they mean that we need two
- helpers so that we always return an appropriate error code if the user's
- code throws an exception.
- Given that we have to have two helpers anyway, these helpers are a bit more
- helpery than most: specifically, they hide a few more of the OpenSSL
- functions so that the user has an easier time writing these callbacks.
- This helper implements the client side.
- """
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
- @wraps(callback)
- def wrapper(ssl, cdata):
- try:
- conn = Connection._reverse_mapping[ssl]
- # Extract the data if any was provided.
- if cdata != _ffi.NULL:
- data = _ffi.from_handle(cdata)
- else:
- data = None
- # Get the OCSP data.
- ocsp_ptr = _ffi.new("unsigned char **")
- ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr)
- if ocsp_len < 0:
- # No OCSP data.
- ocsp_data = b""
- else:
- # Copy the OCSP data, then pass it to the callback.
- ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:]
- valid = callback(conn, ocsp_data, data)
- # Return 1 on success or 0 on error.
- return int(bool(valid))
- except Exception as e:
- self._problems.append(e)
- # Return negative value if an exception is hit.
- return -1
- self.callback = _ffi.callback("int (*)(SSL *, void *)", wrapper)
- class _CookieGenerateCallbackHelper(_CallbackExceptionHelper):
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
- @wraps(callback)
- def wrapper(ssl, out, outlen):
- try:
- conn = Connection._reverse_mapping[ssl]
- cookie = callback(conn)
- out[0 : len(cookie)] = cookie
- outlen[0] = len(cookie)
- return 1
- except Exception as e:
- self._problems.append(e)
- # "a zero return value can be used to abort the handshake"
- return 0
- self.callback = _ffi.callback(
- "int (*)(SSL *, unsigned char *, unsigned int *)",
- wrapper,
- )
- class _CookieVerifyCallbackHelper(_CallbackExceptionHelper):
- def __init__(self, callback):
- _CallbackExceptionHelper.__init__(self)
- @wraps(callback)
- def wrapper(ssl, c_cookie, cookie_len):
- try:
- conn = Connection._reverse_mapping[ssl]
- return callback(conn, bytes(c_cookie[0:cookie_len]))
- except Exception as e:
- self._problems.append(e)
- return 0
- self.callback = _ffi.callback(
- "int (*)(SSL *, unsigned char *, unsigned int)",
- wrapper,
- )
- def _asFileDescriptor(obj):
- fd = None
- if not isinstance(obj, int):
- meth = getattr(obj, "fileno", None)
- if meth is not None:
- obj = meth()
- if isinstance(obj, int):
- fd = obj
- if not isinstance(fd, int):
- raise TypeError("argument must be an int, or have a fileno() method.")
- elif fd < 0:
- raise ValueError(
- "file descriptor cannot be a negative integer (%i)" % (fd,)
- )
- return fd
- def SSLeay_version(type):
- """
- Return a string describing the version of OpenSSL in use.
- :param type: One of the :const:`SSLEAY_` constants defined in this module.
- """
- return _ffi.string(_lib.SSLeay_version(type))
- def _make_requires(flag, error):
- """
- Builds a decorator that ensures that functions that rely on OpenSSL
- functions that are not present in this build raise NotImplementedError,
- rather than AttributeError coming out of cryptography.
- :param flag: A cryptography flag that guards the functions, e.g.
- ``Cryptography_HAS_NEXTPROTONEG``.
- :param error: The string to be used in the exception if the flag is false.
- """
- def _requires_decorator(func):
- if not flag:
- @wraps(func)
- def explode(*args, **kwargs):
- raise NotImplementedError(error)
- return explode
- else:
- return func
- return _requires_decorator
- _requires_alpn = _make_requires(
- _lib.Cryptography_HAS_ALPN, "ALPN not available"
- )
- _requires_keylog = _make_requires(
- getattr(_lib, "Cryptography_HAS_KEYLOG", None), "Key logging not available"
- )
- class Session(object):
- """
- A class representing an SSL session. A session defines certain connection
- parameters which may be re-used to speed up the setup of subsequent
- connections.
- .. versionadded:: 0.14
- """
- pass
- class Context(object):
- """
- :class:`OpenSSL.SSL.Context` instances define the parameters for setting
- up new SSL connections.
- :param method: One of TLS_METHOD, TLS_CLIENT_METHOD, TLS_SERVER_METHOD,
- DTLS_METHOD, DTLS_CLIENT_METHOD, or DTLS_SERVER_METHOD.
- SSLv23_METHOD, TLSv1_METHOD, etc. are deprecated and should
- not be used.
- """
- _methods = {
- SSLv2_METHOD: "SSLv2_method",
- SSLv3_METHOD: "SSLv3_method",
- SSLv23_METHOD: "SSLv23_method",
- TLSv1_METHOD: "TLSv1_method",
- TLSv1_1_METHOD: "TLSv1_1_method",
- TLSv1_2_METHOD: "TLSv1_2_method",
- TLS_METHOD: "TLS_method",
- TLS_SERVER_METHOD: "TLS_server_method",
- TLS_CLIENT_METHOD: "TLS_client_method",
- DTLS_METHOD: "DTLS_method",
- DTLS_SERVER_METHOD: "DTLS_server_method",
- DTLS_CLIENT_METHOD: "DTLS_client_method",
- }
- _methods = dict(
- (identifier, getattr(_lib, name))
- for (identifier, name) in _methods.items()
- if getattr(_lib, name, None) is not None
- )
- def __init__(self, method):
- if not isinstance(method, int):
- raise TypeError("method must be an integer")
- try:
- method_func = self._methods[method]
- except KeyError:
- raise ValueError("No such protocol")
- method_obj = method_func()
- _openssl_assert(method_obj != _ffi.NULL)
- context = _lib.SSL_CTX_new(method_obj)
- _openssl_assert(context != _ffi.NULL)
- context = _ffi.gc(context, _lib.SSL_CTX_free)
- self._context = context
- self._passphrase_helper = None
- self._passphrase_callback = None
- self._passphrase_userdata = None
- self._verify_helper = None
- self._verify_callback = None
- self._info_callback = None
- self._keylog_callback = None
- self._tlsext_servername_callback = None
- self._app_data = None
- self._alpn_select_helper = None
- self._alpn_select_callback = None
- self._ocsp_helper = None
- self._ocsp_callback = None
- self._ocsp_data = None
- self._cookie_generate_helper = None
- self._cookie_verify_helper = None
- self.set_mode(_lib.SSL_MODE_ENABLE_PARTIAL_WRITE)
- def set_min_proto_version(self, version):
- """
- Set the minimum supported protocol version. Setting the minimum
- version to 0 will enable protocol versions down to the lowest version
- supported by the library.
- If the underlying OpenSSL build is missing support for the selected
- version, this method will raise an exception.
- """
- _openssl_assert(
- _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1
- )
- def set_max_proto_version(self, version):
- """
- Set the maximum supported protocol version. Setting the maximum
- version to 0 will enable protocol versions up to the highest version
- supported by the library.
- If the underlying OpenSSL build is missing support for the selected
- version, this method will raise an exception.
- """
- _openssl_assert(
- _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1
- )
- def load_verify_locations(self, cafile, capath=None):
- """
- Let SSL know where we can find trusted certificates for the certificate
- chain. Note that the certificates have to be in PEM format.
- If capath is passed, it must be a directory prepared using the
- ``c_rehash`` tool included with OpenSSL. Either, but not both, of
- *pemfile* or *capath* may be :data:`None`.
- :param cafile: In which file we can find the certificates (``bytes`` or
- ``unicode``).
- :param capath: In which directory we can find the certificates
- (``bytes`` or ``unicode``).
- :return: None
- """
- if cafile is None:
- cafile = _ffi.NULL
- else:
- cafile = _path_bytes(cafile)
- if capath is None:
- capath = _ffi.NULL
- else:
- capath = _path_bytes(capath)
- load_result = _lib.SSL_CTX_load_verify_locations(
- self._context, cafile, capath
- )
- if not load_result:
- _raise_current_error()
- def _wrap_callback(self, callback):
- @wraps(callback)
- def wrapper(size, verify, userdata):
- return callback(size, verify, self._passphrase_userdata)
- return _PassphraseHelper(
- FILETYPE_PEM, wrapper, more_args=True, truncate=True
- )
- def set_passwd_cb(self, callback, userdata=None):
- """
- Set the passphrase callback. This function will be called
- when a private key with a passphrase is loaded.
- :param callback: The Python callback to use. This must accept three
- positional arguments. First, an integer giving the maximum length
- of the passphrase it may return. If the returned passphrase is
- longer than this, it will be truncated. Second, a boolean value
- which will be true if the user should be prompted for the
- passphrase twice and the callback should verify that the two values
- supplied are equal. Third, the value given as the *userdata*
- parameter to :meth:`set_passwd_cb`. The *callback* must return
- a byte string. If an error occurs, *callback* should return a false
- value (e.g. an empty string).
- :param userdata: (optional) A Python object which will be given as
- argument to the callback
- :return: None
- """
- if not callable(callback):
- raise TypeError("callback must be callable")
- self._passphrase_helper = self._wrap_callback(callback)
- self._passphrase_callback = self._passphrase_helper.callback
- _lib.SSL_CTX_set_default_passwd_cb(
- self._context, self._passphrase_callback
- )
- self._passphrase_userdata = userdata
- def set_default_verify_paths(self):
- """
- Specify that the platform provided CA certificates are to be used for
- verification purposes. This method has some caveats related to the
- binary wheels that cryptography (pyOpenSSL's primary dependency) ships:
- * macOS will only load certificates using this method if the user has
- the ``openssl@1.1`` `Homebrew <https://brew.sh>`_ formula installed
- in the default location.
- * Windows will not work.
- * manylinux1 cryptography wheels will work on most common Linux
- distributions in pyOpenSSL 17.1.0 and above. pyOpenSSL detects the
- manylinux1 wheel and attempts to load roots via a fallback path.
- :return: None
- """
- # SSL_CTX_set_default_verify_paths will attempt to load certs from
- # both a cafile and capath that are set at compile time. However,
- # it will first check environment variables and, if present, load
- # those paths instead
- set_result = _lib.SSL_CTX_set_default_verify_paths(self._context)
- _openssl_assert(set_result == 1)
- # After attempting to set default_verify_paths we need to know whether
- # to go down the fallback path.
- # First we'll check to see if any env vars have been set. If so,
- # we won't try to do anything else because the user has set the path
- # themselves.
- dir_env_var = _ffi.string(_lib.X509_get_default_cert_dir_env()).decode(
- "ascii"
- )
- file_env_var = _ffi.string(
- _lib.X509_get_default_cert_file_env()
- ).decode("ascii")
- if not self._check_env_vars_set(dir_env_var, file_env_var):
- default_dir = _ffi.string(_lib.X509_get_default_cert_dir())
- default_file = _ffi.string(_lib.X509_get_default_cert_file())
- # Now we check to see if the default_dir and default_file are set
- # to the exact values we use in our manylinux1 builds. If they are
- # then we know to load the fallbacks
- if (
- default_dir == _CRYPTOGRAPHY_MANYLINUX_CA_DIR
- and default_file == _CRYPTOGRAPHY_MANYLINUX_CA_FILE
- ):
- # This is manylinux1, let's load our fallback paths
- self._fallback_default_verify_paths(
- _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS
- )
- def _check_env_vars_set(self, dir_env_var, file_env_var):
- """
- Check to see if the default cert dir/file environment vars are present.
- :return: bool
- """
- return (
- os.environ.get(file_env_var) is not None
- or os.environ.get(dir_env_var) is not None
- )
- def _fallback_default_verify_paths(self, file_path, dir_path):
- """
- Default verify paths are based on the compiled version of OpenSSL.
- However, when pyca/cryptography is compiled as a manylinux1 wheel
- that compiled location can potentially be wrong. So, like Go, we
- will try a predefined set of paths and attempt to load roots
- from there.
- :return: None
- """
- for cafile in file_path:
- if os.path.isfile(cafile):
- self.load_verify_locations(cafile)
- break
- for capath in dir_path:
- if os.path.isdir(capath):
- self.load_verify_locations(None, capath)
- break
- def use_certificate_chain_file(self, certfile):
- """
- Load a certificate chain from a file.
- :param certfile: The name of the certificate chain file (``bytes`` or
- ``unicode``). Must be PEM encoded.
- :return: None
- """
- certfile = _path_bytes(certfile)
- result = _lib.SSL_CTX_use_certificate_chain_file(
- self._context, certfile
- )
- if not result:
- _raise_current_error()
- def use_certificate_file(self, certfile, filetype=FILETYPE_PEM):
- """
- Load a certificate from a file
- :param certfile: The name of the certificate file (``bytes`` or
- ``unicode``).
- :param filetype: (optional) The encoding of the file, which is either
- :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
- :const:`FILETYPE_PEM`.
- :return: None
- """
- certfile = _path_bytes(certfile)
- if not isinstance(filetype, int):
- raise TypeError("filetype must be an integer")
- use_result = _lib.SSL_CTX_use_certificate_file(
- self._context, certfile, filetype
- )
- if not use_result:
- _raise_current_error()
- def use_certificate(self, cert):
- """
- Load a certificate from a X509 object
- :param cert: The X509 object
- :return: None
- """
- if not isinstance(cert, X509):
- raise TypeError("cert must be an X509 instance")
- use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509)
- if not use_result:
- _raise_current_error()
- def add_extra_chain_cert(self, certobj):
- """
- Add certificate to chain
- :param certobj: The X509 certificate object to add to the chain
- :return: None
- """
- if not isinstance(certobj, X509):
- raise TypeError("certobj must be an X509 instance")
- copy = _lib.X509_dup(certobj._x509)
- add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy)
- if not add_result:
- # TODO: This is untested.
- _lib.X509_free(copy)
- _raise_current_error()
- def _raise_passphrase_exception(self):
- if self._passphrase_helper is not None:
- self._passphrase_helper.raise_if_problem(Error)
- _raise_current_error()
- def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED):
- """
- Load a private key from a file
- :param keyfile: The name of the key file (``bytes`` or ``unicode``)
- :param filetype: (optional) The encoding of the file, which is either
- :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is
- :const:`FILETYPE_PEM`.
- :return: None
- """
- keyfile = _path_bytes(keyfile)
- if filetype is _UNSPECIFIED:
- filetype = FILETYPE_PEM
- elif not isinstance(filetype, int):
- raise TypeError("filetype must be an integer")
- use_result = _lib.SSL_CTX_use_PrivateKey_file(
- self._context, keyfile, filetype
- )
- if not use_result:
- self._raise_passphrase_exception()
- def use_privatekey(self, pkey):
- """
- Load a private key from a PKey object
- :param pkey: The PKey object
- :return: None
- """
- if not isinstance(pkey, PKey):
- raise TypeError("pkey must be a PKey instance")
- use_result = _lib.SSL_CTX_use_PrivateKey(self._context, pkey._pkey)
- if not use_result:
- self._raise_passphrase_exception()
- def check_privatekey(self):
- """
- Check if the private key (loaded with :meth:`use_privatekey`) matches
- the certificate (loaded with :meth:`use_certificate`)
- :return: :data:`None` (raises :exc:`Error` if something's wrong)
- """
- if not _lib.SSL_CTX_check_private_key(self._context):
- _raise_current_error()
- def load_client_ca(self, cafile):
- """
- Load the trusted certificates that will be sent to the client. Does
- not actually imply any of the certificates are trusted; that must be
- configured separately.
- :param bytes cafile: The path to a certificates file in PEM format.
- :return: None
- """
- ca_list = _lib.SSL_load_client_CA_file(
- _text_to_bytes_and_warn("cafile", cafile)
- )
- _openssl_assert(ca_list != _ffi.NULL)
- _lib.SSL_CTX_set_client_CA_list(self._context, ca_list)
- def set_session_id(self, buf):
- """
- Set the session id to *buf* within which a session can be reused for
- this Context object. This is needed when doing session resumption,
- because there is no way for a stored session to know which Context
- object it is associated with.
- :param bytes buf: The session id.
- :returns: None
- """
- buf = _text_to_bytes_and_warn("buf", buf)
- _openssl_assert(
- _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf))
- == 1
- )
- def set_session_cache_mode(self, mode):
- """
- Set the behavior of the session cache used by all connections using
- this Context. The previously set mode is returned. See
- :const:`SESS_CACHE_*` for details about particular modes.
- :param mode: One or more of the SESS_CACHE_* flags (combine using
- bitwise or)
- :returns: The previously set caching mode.
- .. versionadded:: 0.14
- """
- if not isinstance(mode, int):
- raise TypeError("mode must be an integer")
- return _lib.SSL_CTX_set_session_cache_mode(self._context, mode)
- def get_session_cache_mode(self):
- """
- Get the current session cache mode.
- :returns: The currently used cache mode.
- .. versionadded:: 0.14
- """
- return _lib.SSL_CTX_get_session_cache_mode(self._context)
- def set_verify(self, mode, callback=None):
- """
- Set the verification flags for this Context object to *mode* and
- specify that *callback* should be used for verification callbacks.
- :param mode: The verify mode, this should be one of
- :const:`VERIFY_NONE` and :const:`VERIFY_PEER`. If
- :const:`VERIFY_PEER` is used, *mode* can be OR:ed with
- :const:`VERIFY_FAIL_IF_NO_PEER_CERT` and
- :const:`VERIFY_CLIENT_ONCE` to further control the behaviour.
- :param callback: The optional Python verification callback to use.
- This should take five arguments: A Connection object, an X509
- object, and three integer variables, which are in turn potential
- error number, error depth and return code. *callback* should
- return True if verification passes and False otherwise.
- If omitted, OpenSSL's default verification is used.
- :return: None
- See SSL_CTX_set_verify(3SSL) for further details.
- """
- if not isinstance(mode, int):
- raise TypeError("mode must be an integer")
- if callback is None:
- self._verify_helper = None
- self._verify_callback = None
- _lib.SSL_CTX_set_verify(self._context, mode, _ffi.NULL)
- else:
- if not callable(callback):
- raise TypeError("callback must be callable")
- self._verify_helper = _VerifyHelper(callback)
- self._verify_callback = self._verify_helper.callback
- _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback)
- def set_verify_depth(self, depth):
- """
- Set the maximum depth for the certificate chain verification that shall
- be allowed for this Context object.
- :param depth: An integer specifying the verify depth
- :return: None
- """
- if not isinstance(depth, int):
- raise TypeError("depth must be an integer")
- _lib.SSL_CTX_set_verify_depth(self._context, depth)
- def get_verify_mode(self):
- """
- Retrieve the Context object's verify mode, as set by
- :meth:`set_verify`.
- :return: The verify mode
- """
- return _lib.SSL_CTX_get_verify_mode(self._context)
- def get_verify_depth(self):
- """
- Retrieve the Context object's verify depth, as set by
- :meth:`set_verify_depth`.
- :return: The verify depth
- """
- return _lib.SSL_CTX_get_verify_depth(self._context)
- def load_tmp_dh(self, dhfile):
- """
- Load parameters for Ephemeral Diffie-Hellman
- :param dhfile: The file to load EDH parameters from (``bytes`` or
- ``unicode``).
- :return: None
- """
- dhfile = _path_bytes(dhfile)
- bio = _lib.BIO_new_file(dhfile, b"r")
- if bio == _ffi.NULL:
- _raise_current_error()
- bio = _ffi.gc(bio, _lib.BIO_free)
- dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
- dh = _ffi.gc(dh, _lib.DH_free)
- res = _lib.SSL_CTX_set_tmp_dh(self._context, dh)
- _openssl_assert(res == 1)
- def set_tmp_ecdh(self, curve):
- """
- Select a curve to use for ECDHE key exchange.
- :param curve: A curve object to use as returned by either
- :meth:`OpenSSL.crypto.get_elliptic_curve` or
- :meth:`OpenSSL.crypto.get_elliptic_curves`.
- :return: None
- """
- _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY())
- def set_cipher_list(self, cipher_list):
- """
- Set the list of ciphers to be used in this context.
- See the OpenSSL manual for more information (e.g.
- :manpage:`ciphers(1)`).
- :param bytes cipher_list: An OpenSSL cipher string.
- :return: None
- """
- cipher_list = _text_to_bytes_and_warn("cipher_list", cipher_list)
- if not isinstance(cipher_list, bytes):
- raise TypeError("cipher_list must be a byte string.")
- _openssl_assert(
- _lib.SSL_CTX_set_cipher_list(self._context, cipher_list) == 1
- )
- # In OpenSSL 1.1.1 setting the cipher list will always return TLS 1.3
- # ciphers even if you pass an invalid cipher. Applications (like
- # Twisted) have tests that depend on an error being raised if an
- # invalid cipher string is passed, but without the following check
- # for the TLS 1.3 specific cipher suites it would never error.
- tmpconn = Connection(self, None)
- if tmpconn.get_cipher_list() == [
- "TLS_AES_256_GCM_SHA384",
- "TLS_CHACHA20_POLY1305_SHA256",
- "TLS_AES_128_GCM_SHA256",
- ]:
- raise Error(
- [
- (
- "SSL routines",
- "SSL_CTX_set_cipher_list",
- "no cipher match",
- ),
- ],
- )
- def set_client_ca_list(self, certificate_authorities):
- """
- Set the list of preferred client certificate signers for this server
- context.
- This list of certificate authorities will be sent to the client when
- the server requests a client certificate.
- :param certificate_authorities: a sequence of X509Names.
- :return: None
- .. versionadded:: 0.10
- """
- name_stack = _lib.sk_X509_NAME_new_null()
- _openssl_assert(name_stack != _ffi.NULL)
- try:
- for ca_name in certificate_authorities:
- if not isinstance(ca_name, X509Name):
- raise TypeError(
- "client CAs must be X509Name objects, not %s "
- "objects" % (type(ca_name).__name__,)
- )
- copy = _lib.X509_NAME_dup(ca_name._name)
- _openssl_assert(copy != _ffi.NULL)
- push_result = _lib.sk_X509_NAME_push(name_stack, copy)
- if not push_result:
- _lib.X509_NAME_free(copy)
- _raise_current_error()
- except Exception:
- _lib.sk_X509_NAME_free(name_stack)
- raise
- _lib.SSL_CTX_set_client_CA_list(self._context, name_stack)
- def add_client_ca(self, certificate_authority):
- """
- Add the CA certificate to the list of preferred signers for this
- context.
- The list of certificate authorities will be sent to the client when the
- server requests a client certificate.
- :param certificate_authority: certificate authority's X509 certificate.
- :return: None
- .. versionadded:: 0.10
- """
- if not isinstance(certificate_authority, X509):
- raise TypeError("certificate_authority must be an X509 instance")
- add_result = _lib.SSL_CTX_add_client_CA(
- self._context, certificate_authority._x509
- )
- _openssl_assert(add_result == 1)
- def set_timeout(self, timeout):
- """
- Set the timeout for newly created sessions for this Context object to
- *timeout*. The default value is 300 seconds. See the OpenSSL manual
- for more information (e.g. :manpage:`SSL_CTX_set_timeout(3)`).
- :param timeout: The timeout in (whole) seconds
- :return: The previous session timeout
- """
- if not isinstance(timeout, int):
- raise TypeError("timeout must be an integer")
- return _lib.SSL_CTX_set_timeout(self._context, timeout)
- def get_timeout(self):
- """
- Retrieve session timeout, as set by :meth:`set_timeout`. The default
- is 300 seconds.
- :return: The session timeout
- """
- return _lib.SSL_CTX_get_timeout(self._context)
- def set_info_callback(self, callback):
- """
- Set the information callback to *callback*. This function will be
- called from time to time during SSL handshakes.
- :param callback: The Python callback to use. This should take three
- arguments: a Connection object and two integers. The first integer
- specifies where in the SSL handshake the function was called, and
- the other the return code from a (possibly failed) internal
- function call.
- :return: None
- """
- @wraps(callback)
- def wrapper(ssl, where, return_code):
- callback(Connection._reverse_mapping[ssl], where, return_code)
- self._info_callback = _ffi.callback(
- "void (*)(const SSL *, int, int)", wrapper
- )
- _lib.SSL_CTX_set_info_callback(self._context, self._info_callback)
- @_requires_keylog
- def set_keylog_callback(self, callback):
- """
- Set the TLS key logging callback to *callback*. This function will be
- called whenever TLS key material is generated or received, in order
- to allow applications to store this keying material for debugging
- purposes.
- :param callback: The Python callback to use. This should take two
- arguments: a Connection object and a bytestring that contains
- the key material in the format used by NSS for its SSLKEYLOGFILE
- debugging output.
- :return: None
- """
- @wraps(callback)
- def wrapper(ssl, line):
- line = _ffi.string(line)
- callback(Connection._reverse_mapping[ssl], line)
- self._keylog_callback = _ffi.callback(
- "void (*)(const SSL *, const char *)", wrapper
- )
- _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback)
- def get_app_data(self):
- """
- Get the application data (supplied via :meth:`set_app_data()`)
- :return: The application data
- """
- return self._app_data
- def set_app_data(self, data):
- """
- Set the application data (will be returned from get_app_data())
- :param data: Any Python object
- :return: None
- """
- self._app_data = data
- def get_cert_store(self):
- """
- Get the certificate store for the context. This can be used to add
- "trusted" certificates without using the
- :meth:`load_verify_locations` method.
- :return: A X509Store object or None if it does not have one.
- """
- store = _lib.SSL_CTX_get_cert_store(self._context)
- if store == _ffi.NULL:
- # TODO: This is untested.
- return None
- pystore = X509Store.__new__(X509Store)
- pystore._store = store
- return pystore
- def set_options(self, options):
- """
- Add options. Options set before are not cleared!
- This method should be used with the :const:`OP_*` constants.
- :param options: The options to add.
- :return: The new option bitmask.
- """
- if not isinstance(options, int):
- raise TypeError("options must be an integer")
- return _lib.SSL_CTX_set_options(self._context, options)
- def set_mode(self, mode):
- """
- Add modes via bitmask. Modes set before are not cleared! This method
- should be used with the :const:`MODE_*` constants.
- :param mode: The mode to add.
- :return: The new mode bitmask.
- """
- if not isinstance(mode, int):
- raise TypeError("mode must be an integer")
- return _lib.SSL_CTX_set_mode(self._context, mode)
- def set_tlsext_servername_callback(self, callback):
- """
- Specify a callback function to be called when clients specify a server
- name.
- :param callback: The callback function. It will be invoked with one
- argument, the Connection instance.
- .. versionadded:: 0.13
- """
- @wraps(callback)
- def wrapper(ssl, alert, arg):
- callback(Connection._reverse_mapping[ssl])
- return 0
- self._tlsext_servername_callback = _ffi.callback(
- "int (*)(SSL *, int *, void *)", wrapper
- )
- _lib.SSL_CTX_set_tlsext_servername_callback(
- self._context, self._tlsext_servername_callback
- )
- def set_tlsext_use_srtp(self, profiles):
- """
- Enable support for negotiating SRTP keying material.
- :param bytes profiles: A colon delimited list of protection profile
- names, like ``b'SRTP_AES128_CM_SHA1_80:SRTP_AES128_CM_SHA1_32'``.
- :return: None
- """
- if not isinstance(profiles, bytes):
- raise TypeError("profiles must be a byte string.")
- _openssl_assert(
- _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0
- )
- @_requires_alpn
- def set_alpn_protos(self, protos):
- """
- Specify the protocols that the client is prepared to speak after the
- TLS connection has been negotiated using Application Layer Protocol
- Negotiation.
- :param protos: A list of the protocols to be offered to the server.
- This list should be a Python list of bytestrings representing the
- protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
- """
- # Different versions of OpenSSL are inconsistent about how they handle
- # empty proto lists (see #1043), so we avoid the problem entirely by
- # rejecting them ourselves.
- if not protos:
- raise ValueError("at least one protocol must be specified")
- # Take the list of protocols and join them together, prefixing them
- # with their lengths.
- protostr = b"".join(
- chain.from_iterable((bytes((len(p),)), p) for p in protos)
- )
- # Build a C string from the list. We don't need to save this off
- # because OpenSSL immediately copies the data out.
- input_str = _ffi.new("unsigned char[]", protostr)
- # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
- # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
- # return 0 on success, and non-0 on failure.
- # WARNING: these functions reverse the return value convention.
- _openssl_assert(
- _lib.SSL_CTX_set_alpn_protos(
- self._context, input_str, len(protostr)
- )
- == 0
- )
- @_requires_alpn
- def set_alpn_select_callback(self, callback):
- """
- Specify a callback function that will be called on the server when a
- client offers protocols using ALPN.
- :param callback: The callback function. It will be invoked with two
- arguments: the Connection, and a list of offered protocols as
- bytestrings, e.g ``[b'http/1.1', b'spdy/2']``. It can return
- one of those bytestrings to indicate the chosen protocol, the
- empty bytestring to terminate the TLS connection, or the
- :py:obj:`NO_OVERLAPPING_PROTOCOLS` to indicate that no offered
- protocol was selected, but that the connection should not be
- aborted.
- """
- self._alpn_select_helper = _ALPNSelectHelper(callback)
- self._alpn_select_callback = self._alpn_select_helper.callback
- _lib.SSL_CTX_set_alpn_select_cb(
- self._context, self._alpn_select_callback, _ffi.NULL
- )
- def _set_ocsp_callback(self, helper, data):
- """
- This internal helper does the common work for
- ``set_ocsp_server_callback`` and ``set_ocsp_client_callback``, which is
- almost all of it.
- """
- self._ocsp_helper = helper
- self._ocsp_callback = helper.callback
- if data is None:
- self._ocsp_data = _ffi.NULL
- else:
- self._ocsp_data = _ffi.new_handle(data)
- rc = _lib.SSL_CTX_set_tlsext_status_cb(
- self._context, self._ocsp_callback
- )
- _openssl_assert(rc == 1)
- rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data)
- _openssl_assert(rc == 1)
- def set_ocsp_server_callback(self, callback, data=None):
- """
- Set a callback to provide OCSP data to be stapled to the TLS handshake
- on the server side.
- :param callback: The callback function. It will be invoked with two
- arguments: the Connection, and the optional arbitrary data you have
- provided. The callback must return a bytestring that contains the
- OCSP data to staple to the handshake. If no OCSP data is available
- for this connection, return the empty bytestring.
- :param data: Some opaque data that will be passed into the callback
- function when called. This can be used to avoid needing to do
- complex data lookups or to keep track of what context is being
- used. This parameter is optional.
- """
- helper = _OCSPServerCallbackHelper(callback)
- self._set_ocsp_callback(helper, data)
- def set_ocsp_client_callback(self, callback, data=None):
- """
- Set a callback to validate OCSP data stapled to the TLS handshake on
- the client side.
- :param callback: The callback function. It will be invoked with three
- arguments: the Connection, a bytestring containing the stapled OCSP
- assertion, and the optional arbitrary data you have provided. The
- callback must return a boolean that indicates the result of
- validating the OCSP data: ``True`` if the OCSP data is valid and
- the certificate can be trusted, or ``False`` if either the OCSP
- data is invalid or the certificate has been revoked.
- :param data: Some opaque data that will be passed into the callback
- function when called. This can be used to avoid needing to do
- complex data lookups or to keep track of what context is being
- used. This parameter is optional.
- """
- helper = _OCSPClientCallbackHelper(callback)
- self._set_ocsp_callback(helper, data)
- def set_cookie_generate_callback(self, callback):
- self._cookie_generate_helper = _CookieGenerateCallbackHelper(callback)
- _lib.SSL_CTX_set_cookie_generate_cb(
- self._context,
- self._cookie_generate_helper.callback,
- )
- def set_cookie_verify_callback(self, callback):
- self._cookie_verify_helper = _CookieVerifyCallbackHelper(callback)
- _lib.SSL_CTX_set_cookie_verify_cb(
- self._context,
- self._cookie_verify_helper.callback,
- )
- class Connection(object):
- _reverse_mapping = WeakValueDictionary()
- def __init__(self, context, socket=None):
- """
- Create a new Connection object, using the given OpenSSL.SSL.Context
- instance and socket.
- :param context: An SSL Context to use for this connection
- :param socket: The socket to use for transport layer
- """
- if not isinstance(context, Context):
- raise TypeError("context must be a Context instance")
- ssl = _lib.SSL_new(context._context)
- self._ssl = _ffi.gc(ssl, _lib.SSL_free)
- # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns
- # an SSL_ERROR_WANT_READ when processing a non-application data packet
- # even though there is still data on the underlying transport.
- # See https://github.com/openssl/openssl/issues/6234 for more details.
- _lib.SSL_set_mode(self._ssl, _lib.SSL_MODE_AUTO_RETRY)
- self._context = context
- self._app_data = None
- # References to strings used for Application Layer Protocol
- # Negotiation. These strings get copied at some point but it's well
- # after the callback returns, so we have to hang them somewhere to
- # avoid them getting freed.
- self._alpn_select_callback_args = None
- # Reference the verify_callback of the Context. This ensures that if
- # set_verify is called again after the SSL object has been created we
- # do not point to a dangling reference
- self._verify_helper = context._verify_helper
- self._verify_callback = context._verify_callback
- # And likewise for the cookie callbacks
- self._cookie_generate_helper = context._cookie_generate_helper
- self._cookie_verify_helper = context._cookie_verify_helper
- self._reverse_mapping[self._ssl] = self
- if socket is None:
- self._socket = None
- # Don't set up any gc for these, SSL_free will take care of them.
- self._into_ssl = _lib.BIO_new(_lib.BIO_s_mem())
- _openssl_assert(self._into_ssl != _ffi.NULL)
- self._from_ssl = _lib.BIO_new(_lib.BIO_s_mem())
- _openssl_assert(self._from_ssl != _ffi.NULL)
- _lib.SSL_set_bio(self._ssl, self._into_ssl, self._from_ssl)
- else:
- self._into_ssl = None
- self._from_ssl = None
- self._socket = socket
- set_result = _lib.SSL_set_fd(
- self._ssl, _asFileDescriptor(self._socket)
- )
- _openssl_assert(set_result == 1)
- def __getattr__(self, name):
- """
- Look up attributes on the wrapped socket object if they are not found
- on the Connection object.
- """
- if self._socket is None:
- raise AttributeError(
- "'%s' object has no attribute '%s'"
- % (self.__class__.__name__, name)
- )
- else:
- return getattr(self._socket, name)
- def _raise_ssl_error(self, ssl, result):
- if self._context._verify_helper is not None:
- self._context._verify_helper.raise_if_problem()
- if self._context._alpn_select_helper is not None:
- self._context._alpn_select_helper.raise_if_problem()
- if self._context._ocsp_helper is not None:
- self._context._ocsp_helper.raise_if_problem()
- error = _lib.SSL_get_error(ssl, result)
- if error == _lib.SSL_ERROR_WANT_READ:
- raise WantReadError()
- elif error == _lib.SSL_ERROR_WANT_WRITE:
- raise WantWriteError()
- elif error == _lib.SSL_ERROR_ZERO_RETURN:
- raise ZeroReturnError()
- elif error == _lib.SSL_ERROR_WANT_X509_LOOKUP:
- # TODO: This is untested.
- raise WantX509LookupError()
- elif error == _lib.SSL_ERROR_SYSCALL:
- if _lib.ERR_peek_error() == 0:
- if result < 0:
- if platform == "win32":
- errno = _ffi.getwinerror()[0]
- else:
- errno = _ffi.errno
- if errno != 0:
- raise SysCallError(errno, errorcode.get(errno))
- raise SysCallError(-1, "Unexpected EOF")
- else:
- # TODO: This is untested.
- _raise_current_error()
- elif error == _lib.SSL_ERROR_NONE:
- pass
- else:
- _raise_current_error()
- def get_context(self):
- """
- Retrieve the :class:`Context` object associated with this
- :class:`Connection`.
- """
- return self._context
- def set_context(self, context):
- """
- Switch this connection to a new session context.
- :param context: A :class:`Context` instance giving the new session
- context to use.
- """
- if not isinstance(context, Context):
- raise TypeError("context must be a Context instance")
- _lib.SSL_set_SSL_CTX(self._ssl, context._context)
- self._context = context
- def get_servername(self):
- """
- Retrieve the servername extension value if provided in the client hello
- message, or None if there wasn't one.
- :return: A byte string giving the server name or :data:`None`.
- .. versionadded:: 0.13
- """
- name = _lib.SSL_get_servername(
- self._ssl, _lib.TLSEXT_NAMETYPE_host_name
- )
- if name == _ffi.NULL:
- return None
- return _ffi.string(name)
- def set_ciphertext_mtu(self, mtu):
- """
- For DTLS, set the maximum UDP payload size (*not* including IP/UDP
- overhead).
- Note that you might have to set :data:`OP_NO_QUERY_MTU` to prevent
- OpenSSL from spontaneously clearing this.
- :param mtu: An integer giving the maximum transmission unit.
- .. versionadded:: 21.1
- """
- _lib.SSL_set_mtu(self._ssl, mtu)
- def get_cleartext_mtu(self):
- """
- For DTLS, get the maximum size of unencrypted data you can pass to
- :meth:`write` without exceeding the MTU (as passed to
- :meth:`set_ciphertext_mtu`).
- :return: The effective MTU as an integer.
- .. versionadded:: 21.1
- """
- if not hasattr(_lib, "DTLS_get_data_mtu"):
- raise NotImplementedError("requires OpenSSL 1.1.1 or better")
- return _lib.DTLS_get_data_mtu(self._ssl)
- def set_tlsext_host_name(self, name):
- """
- Set the value of the servername extension to send in the client hello.
- :param name: A byte string giving the name.
- .. versionadded:: 0.13
- """
- if not isinstance(name, bytes):
- raise TypeError("name must be a byte string")
- elif b"\0" in name:
- raise TypeError("name must not contain NUL byte")
- # XXX I guess this can fail sometimes?
- _lib.SSL_set_tlsext_host_name(self._ssl, name)
- def pending(self):
- """
- Get the number of bytes that can be safely read from the SSL buffer
- (**not** the underlying transport buffer).
- :return: The number of bytes available in the receive buffer.
- """
- return _lib.SSL_pending(self._ssl)
- def send(self, buf, flags=0):
- """
- Send data on the connection. NOTE: If you get one of the WantRead,
- WantWrite or WantX509Lookup exceptions on this, you have to call the
- method again with the SAME buffer.
- :param buf: The string, buffer or memoryview to send
- :param flags: (optional) Included for compatibility with the socket
- API, the value is ignored
- :return: The number of bytes written
- """
- # Backward compatibility
- buf = _text_to_bytes_and_warn("buf", buf)
- with _ffi.from_buffer(buf) as data:
- # check len(buf) instead of len(data) for testability
- if len(buf) > 2147483647:
- raise ValueError(
- "Cannot send more than 2**31-1 bytes at once."
- )
- result = _lib.SSL_write(self._ssl, data, len(data))
- self._raise_ssl_error(self._ssl, result)
- return result
- write = send
- def sendall(self, buf, flags=0):
- """
- Send "all" data on the connection. This calls send() repeatedly until
- all data is sent. If an error occurs, it's impossible to tell how much
- data has been sent.
- :param buf: The string, buffer or memoryview to send
- :param flags: (optional) Included for compatibility with the socket
- API, the value is ignored
- :return: The number of bytes written
- """
- buf = _text_to_bytes_and_warn("buf", buf)
- with _ffi.from_buffer(buf) as data:
- left_to_send = len(buf)
- total_sent = 0
- while left_to_send:
- # SSL_write's num arg is an int,
- # so we cannot send more than 2**31-1 bytes at once.
- result = _lib.SSL_write(
- self._ssl, data + total_sent, min(left_to_send, 2147483647)
- )
- self._raise_ssl_error(self._ssl, result)
- total_sent += result
- left_to_send -= result
- return total_sent
- def recv(self, bufsiz, flags=None):
- """
- Receive data on the connection.
- :param bufsiz: The maximum number of bytes to read
- :param flags: (optional) The only supported flag is ``MSG_PEEK``,
- all other flags are ignored.
- :return: The string read from the Connection
- """
- buf = _no_zero_allocator("char[]", bufsiz)
- if flags is not None and flags & socket.MSG_PEEK:
- result = _lib.SSL_peek(self._ssl, buf, bufsiz)
- else:
- result = _lib.SSL_read(self._ssl, buf, bufsiz)
- self._raise_ssl_error(self._ssl, result)
- return _ffi.buffer(buf, result)[:]
- read = recv
- def recv_into(self, buffer, nbytes=None, flags=None):
- """
- Receive data on the connection and copy it directly into the provided
- buffer, rather than creating a new string.
- :param buffer: The buffer to copy into.
- :param nbytes: (optional) The maximum number of bytes to read into the
- buffer. If not present, defaults to the size of the buffer. If
- larger than the size of the buffer, is reduced to the size of the
- buffer.
- :param flags: (optional) The only supported flag is ``MSG_PEEK``,
- all other flags are ignored.
- :return: The number of bytes read into the buffer.
- """
- if nbytes is None:
- nbytes = len(buffer)
- else:
- nbytes = min(nbytes, len(buffer))
- # We need to create a temporary buffer. This is annoying, it would be
- # better if we could pass memoryviews straight into the SSL_read call,
- # but right now we can't. Revisit this if CFFI gets that ability.
- buf = _no_zero_allocator("char[]", nbytes)
- if flags is not None and flags & socket.MSG_PEEK:
- result = _lib.SSL_peek(self._ssl, buf, nbytes)
- else:
- result = _lib.SSL_read(self._ssl, buf, nbytes)
- self._raise_ssl_error(self._ssl, result)
- # This strange line is all to avoid a memory copy. The buffer protocol
- # should allow us to assign a CFFI buffer to the LHS of this line, but
- # on CPython 3.3+ that segfaults. As a workaround, we can temporarily
- # wrap it in a memoryview.
- buffer[:result] = memoryview(_ffi.buffer(buf, result))
- return result
- def _handle_bio_errors(self, bio, result):
- if _lib.BIO_should_retry(bio):
- if _lib.BIO_should_read(bio):
- raise WantReadError()
- elif _lib.BIO_should_write(bio):
- # TODO: This is untested.
- raise WantWriteError()
- elif _lib.BIO_should_io_special(bio):
- # TODO: This is untested. I think io_special means the socket
- # BIO has a not-yet connected socket.
- raise ValueError("BIO_should_io_special")
- else:
- # TODO: This is untested.
- raise ValueError("unknown bio failure")
- else:
- # TODO: This is untested.
- _raise_current_error()
- def bio_read(self, bufsiz):
- """
- If the Connection was created with a memory BIO, this method can be
- used to read bytes from the write end of that memory BIO. Many
- Connection methods will add bytes which must be read in this manner or
- the buffer will eventually fill up and the Connection will be able to
- take no further actions.
- :param bufsiz: The maximum number of bytes to read
- :return: The string read.
- """
- if self._from_ssl is None:
- raise TypeError("Connection sock was not None")
- if not isinstance(bufsiz, int):
- raise TypeError("bufsiz must be an integer")
- buf = _no_zero_allocator("char[]", bufsiz)
- result = _lib.BIO_read(self._from_ssl, buf, bufsiz)
- if result <= 0:
- self._handle_bio_errors(self._from_ssl, result)
- return _ffi.buffer(buf, result)[:]
- def bio_write(self, buf):
- """
- If the Connection was created with a memory BIO, this method can be
- used to add bytes to the read end of that memory BIO. The Connection
- can then read the bytes (for example, in response to a call to
- :meth:`recv`).
- :param buf: The string to put into the memory BIO.
- :return: The number of bytes written
- """
- buf = _text_to_bytes_and_warn("buf", buf)
- if self._into_ssl is None:
- raise TypeError("Connection sock was not None")
- with _ffi.from_buffer(buf) as data:
- result = _lib.BIO_write(self._into_ssl, data, len(data))
- if result <= 0:
- self._handle_bio_errors(self._into_ssl, result)
- return result
- def renegotiate(self):
- """
- Renegotiate the session.
- :return: True if the renegotiation can be started, False otherwise
- :rtype: bool
- """
- if not self.renegotiate_pending():
- _openssl_assert(_lib.SSL_renegotiate(self._ssl) == 1)
- return True
- return False
- def do_handshake(self):
- """
- Perform an SSL handshake (usually called after :meth:`renegotiate` or
- one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can
- raise the same exceptions as :meth:`send` and :meth:`recv`.
- :return: None.
- """
- result = _lib.SSL_do_handshake(self._ssl)
- self._raise_ssl_error(self._ssl, result)
- def renegotiate_pending(self):
- """
- Check if there's a renegotiation in progress, it will return False once
- a renegotiation is finished.
- :return: Whether there's a renegotiation in progress
- :rtype: bool
- """
- return _lib.SSL_renegotiate_pending(self._ssl) == 1
- def total_renegotiations(self):
- """
- Find out the total number of renegotiations.
- :return: The number of renegotiations.
- :rtype: int
- """
- return _lib.SSL_total_renegotiations(self._ssl)
- def connect(self, addr):
- """
- Call the :meth:`connect` method of the underlying socket and set up SSL
- on the socket, using the :class:`Context` object supplied to this
- :class:`Connection` object at creation.
- :param addr: A remote address
- :return: What the socket's connect method returns
- """
- _lib.SSL_set_connect_state(self._ssl)
- return self._socket.connect(addr)
- def connect_ex(self, addr):
- """
- Call the :meth:`connect_ex` method of the underlying socket and set up
- SSL on the socket, using the Context object supplied to this Connection
- object at creation. Note that if the :meth:`connect_ex` method of the
- socket doesn't return 0, SSL won't be initialized.
- :param addr: A remove address
- :return: What the socket's connect_ex method returns
- """
- connect_ex = self._socket.connect_ex
- self.set_connect_state()
- return connect_ex(addr)
- def accept(self):
- """
- Call the :meth:`accept` method of the underlying socket and set up SSL
- on the returned socket, using the Context object supplied to this
- :class:`Connection` object at creation.
- :return: A *(conn, addr)* pair where *conn* is the new
- :class:`Connection` object created, and *address* is as returned by
- the socket's :meth:`accept`.
- """
- client, addr = self._socket.accept()
- conn = Connection(self._context, client)
- conn.set_accept_state()
- return (conn, addr)
- def DTLSv1_listen(self):
- """
- Call the OpenSSL function DTLSv1_listen on this connection. See the
- OpenSSL manual for more details.
- :return: None
- """
- # Possible future extension: return the BIO_ADDR in some form.
- bio_addr = _lib.BIO_ADDR_new()
- try:
- result = _lib.DTLSv1_listen(self._ssl, bio_addr)
- finally:
- _lib.BIO_ADDR_free(bio_addr)
- # DTLSv1_listen is weird. A zero return value means 'didn't find a
- # ClientHello with valid cookie, but keep trying'. So basically
- # WantReadError. But it doesn't work correctly with _raise_ssl_error.
- # So we raise it manually instead.
- if self._cookie_generate_helper is not None:
- self._cookie_generate_helper.raise_if_problem()
- if self._cookie_verify_helper is not None:
- self._cookie_verify_helper.raise_if_problem()
- if result == 0:
- raise WantReadError()
- if result < 0:
- self._raise_ssl_error(self._ssl, result)
- def bio_shutdown(self):
- """
- If the Connection was created with a memory BIO, this method can be
- used to indicate that *end of file* has been reached on the read end of
- that memory BIO.
- :return: None
- """
- if self._from_ssl is None:
- raise TypeError("Connection sock was not None")
- _lib.BIO_set_mem_eof_return(self._into_ssl, 0)
- def shutdown(self):
- """
- Send the shutdown message to the Connection.
- :return: True if the shutdown completed successfully (i.e. both sides
- have sent closure alerts), False otherwise (in which case you
- call :meth:`recv` or :meth:`send` when the connection becomes
- readable/writeable).
- """
- result = _lib.SSL_shutdown(self._ssl)
- if result < 0:
- self._raise_ssl_error(self._ssl, result)
- elif result > 0:
- return True
- else:
- return False
- def get_cipher_list(self):
- """
- Retrieve the list of ciphers used by the Connection object.
- :return: A list of native cipher strings.
- """
- ciphers = []
- for i in count():
- result = _lib.SSL_get_cipher_list(self._ssl, i)
- if result == _ffi.NULL:
- break
- ciphers.append(_ffi.string(result).decode("utf-8"))
- return ciphers
- def get_client_ca_list(self):
- """
- Get CAs whose certificates are suggested for client authentication.
- :return: If this is a server connection, the list of certificate
- authorities that will be sent or has been sent to the client, as
- controlled by this :class:`Connection`'s :class:`Context`.
- If this is a client connection, the list will be empty until the
- connection with the server is established.
- .. versionadded:: 0.10
- """
- ca_names = _lib.SSL_get_client_CA_list(self._ssl)
- if ca_names == _ffi.NULL:
- # TODO: This is untested.
- return []
- result = []
- for i in range(_lib.sk_X509_NAME_num(ca_names)):
- name = _lib.sk_X509_NAME_value(ca_names, i)
- copy = _lib.X509_NAME_dup(name)
- _openssl_assert(copy != _ffi.NULL)
- pyname = X509Name.__new__(X509Name)
- pyname._name = _ffi.gc(copy, _lib.X509_NAME_free)
- result.append(pyname)
- return result
- def makefile(self, *args, **kwargs):
- """
- The makefile() method is not implemented, since there is no dup
- semantics for SSL connections
- :raise: NotImplementedError
- """
- raise NotImplementedError(
- "Cannot make file object of OpenSSL.SSL.Connection"
- )
- def get_app_data(self):
- """
- Retrieve application data as set by :meth:`set_app_data`.
- :return: The application data
- """
- return self._app_data
- def set_app_data(self, data):
- """
- Set application data
- :param data: The application data
- :return: None
- """
- self._app_data = data
- def get_shutdown(self):
- """
- Get the shutdown state of the Connection.
- :return: The shutdown state, a bitvector of SENT_SHUTDOWN,
- RECEIVED_SHUTDOWN.
- """
- return _lib.SSL_get_shutdown(self._ssl)
- def set_shutdown(self, state):
- """
- Set the shutdown state of the Connection.
- :param state: bitvector of SENT_SHUTDOWN, RECEIVED_SHUTDOWN.
- :return: None
- """
- if not isinstance(state, int):
- raise TypeError("state must be an integer")
- _lib.SSL_set_shutdown(self._ssl, state)
- def get_state_string(self):
- """
- Retrieve a verbose string detailing the state of the Connection.
- :return: A string representing the state
- :rtype: bytes
- """
- return _ffi.string(_lib.SSL_state_string_long(self._ssl))
- def server_random(self):
- """
- Retrieve the random value used with the server hello message.
- :return: A string representing the state
- """
- session = _lib.SSL_get_session(self._ssl)
- if session == _ffi.NULL:
- return None
- length = _lib.SSL_get_server_random(self._ssl, _ffi.NULL, 0)
- _openssl_assert(length > 0)
- outp = _no_zero_allocator("unsigned char[]", length)
- _lib.SSL_get_server_random(self._ssl, outp, length)
- return _ffi.buffer(outp, length)[:]
- def client_random(self):
- """
- Retrieve the random value used with the client hello message.
- :return: A string representing the state
- """
- session = _lib.SSL_get_session(self._ssl)
- if session == _ffi.NULL:
- return None
- length = _lib.SSL_get_client_random(self._ssl, _ffi.NULL, 0)
- _openssl_assert(length > 0)
- outp = _no_zero_allocator("unsigned char[]", length)
- _lib.SSL_get_client_random(self._ssl, outp, length)
- return _ffi.buffer(outp, length)[:]
- def master_key(self):
- """
- Retrieve the value of the master key for this session.
- :return: A string representing the state
- """
- session = _lib.SSL_get_session(self._ssl)
- if session == _ffi.NULL:
- return None
- length = _lib.SSL_SESSION_get_master_key(session, _ffi.NULL, 0)
- _openssl_assert(length > 0)
- outp = _no_zero_allocator("unsigned char[]", length)
- _lib.SSL_SESSION_get_master_key(session, outp, length)
- return _ffi.buffer(outp, length)[:]
- def export_keying_material(self, label, olen, context=None):
- """
- Obtain keying material for application use.
- :param: label - a disambiguating label string as described in RFC 5705
- :param: olen - the length of the exported key material in bytes
- :param: context - a per-association context value
- :return: the exported key material bytes or None
- """
- outp = _no_zero_allocator("unsigned char[]", olen)
- context_buf = _ffi.NULL
- context_len = 0
- use_context = 0
- if context is not None:
- context_buf = context
- context_len = len(context)
- use_context = 1
- success = _lib.SSL_export_keying_material(
- self._ssl,
- outp,
- olen,
- label,
- len(label),
- context_buf,
- context_len,
- use_context,
- )
- _openssl_assert(success == 1)
- return _ffi.buffer(outp, olen)[:]
- def sock_shutdown(self, *args, **kwargs):
- """
- Call the :meth:`shutdown` method of the underlying socket.
- See :manpage:`shutdown(2)`.
- :return: What the socket's shutdown() method returns
- """
- return self._socket.shutdown(*args, **kwargs)
- def get_certificate(self):
- """
- Retrieve the local certificate (if any)
- :return: The local certificate
- """
- cert = _lib.SSL_get_certificate(self._ssl)
- if cert != _ffi.NULL:
- _lib.X509_up_ref(cert)
- return X509._from_raw_x509_ptr(cert)
- return None
- def get_peer_certificate(self):
- """
- Retrieve the other side's certificate (if any)
- :return: The peer's certificate
- """
- cert = _lib.SSL_get_peer_certificate(self._ssl)
- if cert != _ffi.NULL:
- return X509._from_raw_x509_ptr(cert)
- return None
- @staticmethod
- def _cert_stack_to_list(cert_stack):
- """
- Internal helper to convert a STACK_OF(X509) to a list of X509
- instances.
- """
- result = []
- for i in range(_lib.sk_X509_num(cert_stack)):
- cert = _lib.sk_X509_value(cert_stack, i)
- _openssl_assert(cert != _ffi.NULL)
- res = _lib.X509_up_ref(cert)
- _openssl_assert(res >= 1)
- pycert = X509._from_raw_x509_ptr(cert)
- result.append(pycert)
- return result
- def get_peer_cert_chain(self):
- """
- Retrieve the other side's certificate (if any)
- :return: A list of X509 instances giving the peer's certificate chain,
- or None if it does not have one.
- """
- cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl)
- if cert_stack == _ffi.NULL:
- return None
- return self._cert_stack_to_list(cert_stack)
- def get_verified_chain(self):
- """
- Retrieve the verified certificate chain of the peer including the
- peer's end entity certificate. It must be called after a session has
- been successfully established. If peer verification was not successful
- the chain may be incomplete, invalid, or None.
- :return: A list of X509 instances giving the peer's verified
- certificate chain, or None if it does not have one.
- .. versionadded:: 20.0
- """
- # OpenSSL 1.1+
- cert_stack = _lib.SSL_get0_verified_chain(self._ssl)
- if cert_stack == _ffi.NULL:
- return None
- return self._cert_stack_to_list(cert_stack)
- def want_read(self):
- """
- Checks if more data has to be read from the transport layer to complete
- an operation.
- :return: True iff more data has to be read
- """
- return _lib.SSL_want_read(self._ssl)
- def want_write(self):
- """
- Checks if there is data to write to the transport layer to complete an
- operation.
- :return: True iff there is data to write
- """
- return _lib.SSL_want_write(self._ssl)
- def set_accept_state(self):
- """
- Set the connection to work in server mode. The handshake will be
- handled automatically by read/write.
- :return: None
- """
- _lib.SSL_set_accept_state(self._ssl)
- def set_connect_state(self):
- """
- Set the connection to work in client mode. The handshake will be
- handled automatically by read/write.
- :return: None
- """
- _lib.SSL_set_connect_state(self._ssl)
- def get_session(self):
- """
- Returns the Session currently used.
- :return: An instance of :class:`OpenSSL.SSL.Session` or
- :obj:`None` if no session exists.
- .. versionadded:: 0.14
- """
- session = _lib.SSL_get1_session(self._ssl)
- if session == _ffi.NULL:
- return None
- pysession = Session.__new__(Session)
- pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free)
- return pysession
- def set_session(self, session):
- """
- Set the session to be used when the TLS/SSL connection is established.
- :param session: A Session instance representing the session to use.
- :returns: None
- .. versionadded:: 0.14
- """
- if not isinstance(session, Session):
- raise TypeError("session must be a Session instance")
- result = _lib.SSL_set_session(self._ssl, session._session)
- _openssl_assert(result == 1)
- def _get_finished_message(self, function):
- """
- Helper to implement :meth:`get_finished` and
- :meth:`get_peer_finished`.
- :param function: Either :data:`SSL_get_finished`: or
- :data:`SSL_get_peer_finished`.
- :return: :data:`None` if the desired message has not yet been
- received, otherwise the contents of the message.
- :rtype: :class:`bytes` or :class:`NoneType`
- """
- # The OpenSSL documentation says nothing about what might happen if the
- # count argument given is zero. Specifically, it doesn't say whether
- # the output buffer may be NULL in that case or not. Inspection of the
- # implementation reveals that it calls memcpy() unconditionally.
- # Section 7.1.4, paragraph 1 of the C standard suggests that
- # memcpy(NULL, source, 0) is not guaranteed to produce defined (let
- # alone desirable) behavior (though it probably does on just about
- # every implementation...)
- #
- # Allocate a tiny buffer to pass in (instead of just passing NULL as
- # one might expect) for the initial call so as to be safe against this
- # potentially undefined behavior.
- empty = _ffi.new("char[]", 0)
- size = function(self._ssl, empty, 0)
- if size == 0:
- # No Finished message so far.
- return None
- buf = _no_zero_allocator("char[]", size)
- function(self._ssl, buf, size)
- return _ffi.buffer(buf, size)[:]
- def get_finished(self):
- """
- Obtain the latest TLS Finished message that we sent.
- :return: The contents of the message or :obj:`None` if the TLS
- handshake has not yet completed.
- :rtype: :class:`bytes` or :class:`NoneType`
- .. versionadded:: 0.15
- """
- return self._get_finished_message(_lib.SSL_get_finished)
- def get_peer_finished(self):
- """
- Obtain the latest TLS Finished message that we received from the peer.
- :return: The contents of the message or :obj:`None` if the TLS
- handshake has not yet completed.
- :rtype: :class:`bytes` or :class:`NoneType`
- .. versionadded:: 0.15
- """
- return self._get_finished_message(_lib.SSL_get_peer_finished)
- def get_cipher_name(self):
- """
- Obtain the name of the currently used cipher.
- :returns: The name of the currently used cipher or :obj:`None`
- if no connection has been established.
- :rtype: :class:`unicode` or :class:`NoneType`
- .. versionadded:: 0.15
- """
- cipher = _lib.SSL_get_current_cipher(self._ssl)
- if cipher == _ffi.NULL:
- return None
- else:
- name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher))
- return name.decode("utf-8")
- def get_cipher_bits(self):
- """
- Obtain the number of secret bits of the currently used cipher.
- :returns: The number of secret bits of the currently used cipher
- or :obj:`None` if no connection has been established.
- :rtype: :class:`int` or :class:`NoneType`
- .. versionadded:: 0.15
- """
- cipher = _lib.SSL_get_current_cipher(self._ssl)
- if cipher == _ffi.NULL:
- return None
- else:
- return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL)
- def get_cipher_version(self):
- """
- Obtain the protocol version of the currently used cipher.
- :returns: The protocol name of the currently used cipher
- or :obj:`None` if no connection has been established.
- :rtype: :class:`unicode` or :class:`NoneType`
- .. versionadded:: 0.15
- """
- cipher = _lib.SSL_get_current_cipher(self._ssl)
- if cipher == _ffi.NULL:
- return None
- else:
- version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher))
- return version.decode("utf-8")
- def get_protocol_version_name(self):
- """
- Retrieve the protocol version of the current connection.
- :returns: The TLS version of the current connection, for example
- the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown``
- for connections that were not successfully established.
- :rtype: :class:`unicode`
- """
- version = _ffi.string(_lib.SSL_get_version(self._ssl))
- return version.decode("utf-8")
- def get_protocol_version(self):
- """
- Retrieve the SSL or TLS protocol version of the current connection.
- :returns: The TLS version of the current connection. For example,
- it will return ``0x769`` for connections made over TLS version 1.
- :rtype: :class:`int`
- """
- version = _lib.SSL_version(self._ssl)
- return version
- @_requires_alpn
- def set_alpn_protos(self, protos):
- """
- Specify the client's ALPN protocol list.
- These protocols are offered to the server during protocol negotiation.
- :param protos: A list of the protocols to be offered to the server.
- This list should be a Python list of bytestrings representing the
- protocols to offer, e.g. ``[b'http/1.1', b'spdy/2']``.
- """
- # Different versions of OpenSSL are inconsistent about how they handle
- # empty proto lists (see #1043), so we avoid the problem entirely by
- # rejecting them ourselves.
- if not protos:
- raise ValueError("at least one protocol must be specified")
- # Take the list of protocols and join them together, prefixing them
- # with their lengths.
- protostr = b"".join(
- chain.from_iterable((bytes((len(p),)), p) for p in protos)
- )
- # Build a C string from the list. We don't need to save this off
- # because OpenSSL immediately copies the data out.
- input_str = _ffi.new("unsigned char[]", protostr)
- # https://www.openssl.org/docs/man1.1.0/man3/SSL_CTX_set_alpn_protos.html:
- # SSL_CTX_set_alpn_protos() and SSL_set_alpn_protos()
- # return 0 on success, and non-0 on failure.
- # WARNING: these functions reverse the return value convention.
- _openssl_assert(
- _lib.SSL_set_alpn_protos(self._ssl, input_str, len(protostr)) == 0
- )
- @_requires_alpn
- def get_alpn_proto_negotiated(self):
- """
- Get the protocol that was negotiated by ALPN.
- :returns: A bytestring of the protocol name. If no protocol has been
- negotiated yet, returns an empty string.
- """
- data = _ffi.new("unsigned char **")
- data_len = _ffi.new("unsigned int *")
- _lib.SSL_get0_alpn_selected(self._ssl, data, data_len)
- if not data_len:
- return b""
- return _ffi.buffer(data[0], data_len[0])[:]
- def request_ocsp(self):
- """
- Called to request that the server sends stapled OCSP data, if
- available. If this is not called on the client side then the server
- will not send OCSP data. Should be used in conjunction with
- :meth:`Context.set_ocsp_client_callback`.
- """
- rc = _lib.SSL_set_tlsext_status_type(
- self._ssl, _lib.TLSEXT_STATUSTYPE_ocsp
- )
- _openssl_assert(rc == 1)
- # This is similar to the initialization calls at the end of OpenSSL/crypto.py
- # but is exercised mostly by the Context initializer.
- _lib.SSL_library_init()
|