Elasticsearch client for Zope3
Project description
This package provides an elasticsearch client for Zope3.
README
This package provides an elasticsearch client. Note we use a different port within our elasticsearch server stub (45299 instead of 9200). See elasticsearch/config for more info:
>>> from pprint import pprint >>> from p01.elasticsearch import interfaces >>> from p01.elasticsearch.pool import ServerPool >>> from p01.elasticsearch.pool import ElasticSearchConnectionPool>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers, retryDelay=10, timeout=5)>>> import p01.elasticsearch.testing >>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer
ElasticSearchConnectionPool
We need to setup a elasticsearch connection pool:
>>> connectionPool = ElasticSearchConnectionPool(serverPool)
The connection pool stores the connection in threading local. You can set the re-connection time which is by default set to 60 seconds:
>>> connectionPool <ElasticSearchConnectionPool localhost:45299>>>> connectionPool.reConnectIntervall 60>>> connectionPool.reConnectIntervall = 30 >>> connectionPool.reConnectIntervall 30
ElasticSearchConnection
Now we are able to get a connection which is persistent and observed by a thread local from the pool:
>>> conn = connectionPool.connection >>> conn <ElasticSearchConnection localhost:45299>
Such a connection provides a server pool which de connection can choose from. If a server goes down, another server get used. The Connection is also balancing http connections between all servers:
>>> conn.serverPool <ServerPool retryDelay:10 localhost:45299>>>> conn.serverPool.info 'localhost:45299'
Also a maxRetries value is provided. If by default None is given the connection will choose a max retry of alive server e.g. len(self.serverPool.aliveServers):
>>> conn.maxRetries is None True
Another property called autoRefresh is responsible for call refresh implicit if a previous connection call changes the search index e.g. as the index call whould do:
>>> conn.autoRefresh False
And there is a marker for bulk size. This means if we use the bulk marker which some methods provide. The bulkMaxSize value makes sure that not more then the given amount of items get cached in the connection before sent to the server:
>>> conn.bulkMaxSize 400
Mapping Configuration
Our test setup uses a predefined mapping configuration. This, I guess, is the common use case in most projects. I’m not really a friend of dynamic mapping at least if compes to migration and legacy data handling. Bbut of corse for some use case dynamic mapping is a nice feature. At least if you have to index cawled data and offer a search over all (_all) fields. Let’s test our predefined mappings:
Up till Elasticsearch version 19.1, this would return {}, but now it returns status 404, so our code raises an exception. This will be fixed in elasticsearch 19.5.
>>> conn.getMapping() {}
As you can see, we don’t get a default mapping yet. First we need to index at least one item. Let’s index a fisrt job
>>> job = {'title': u'Wir suchen einen Marketingplaner', ... 'description': u'Wir bieten eine gute Anstellung'}>>> pprint(conn.index(job, 'testing', 'job', 1)) {u'_id': u'1', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}>>> statusRENormalizer.pprint(conn.getMapping()) {u'testing': {u'job': {u'_all': {u'store': u'yes'}, u'_id': {u'store': u'yes'}, u'_index': {u'enabled': True}, u'_type': {u'store': u'yes'}, u'properties': {u'__name__': {u'boost': 2.0, u'include_in_all': False, u'null_value': u'na', u'type': u'string'}, u'contact': {u'include_in_all': False, u'properties': {u'firstname': {u'include_in_all': False, u'type': u'string'}, u'lastname': {u'include_in_all': False, u'type': u'string'}}}, u'description': {u'include_in_all': True, u'null_value': u'na', u'type': u'string'}, u'location': {u'geohash': True, u'lat_lon': True, u'type': u'geo_point'}, u'published': {u'format': u'date_optional_time', u'type': u'date'}, u'requirements': {u'properties': {u'description': {u'type': u'string'}, u'name': {u'type': u'string'}}}, u'tags': {u'index_name': u'tag', u'type': u'string'}, u'title': {u'boost': 2.0, u'include_in_all': True, u'null_value': u'na', u'type': u'string'}}}}}
Let’s define another item with more data and index them:
>>> import datetime >>> job = {'title': u'Wir suchen einen Buchhalter', ... 'description': u'Wir bieten Ihnen eine gute Anstellung', ... 'requirements': [ ... {'name': u'MBA', 'description': u'MBA Abschluss'} ... ], ... 'tags': [u'MBA', u'certified'], ... 'published': datetime.datetime(2011, 02, 24, 12, 0, 0), ... 'contact': { ... 'firstname': u'Jessy', ... 'lastname': u'Ineichen', ... }, ... 'location': [-71.34, 41.12]} >>> pprint(conn.index(job, 'testing', 'job', 2)) {u'_id': u'2', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}>>> import time >>> time.sleep(1)
get
Now let’s get the job from our index by it’s id. But first refresh our index:
>>> statusRENormalizer.pprint(conn.get(2, "testing", "job")) {u'_id': u'2', u'_index': u'testing', u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job', u'_version': 1, u'exists': True}
search
Now also let’s try to search:
>>> response = conn.search("title:Buchhalter", 'testing', 'job') >>> response <SearchResponse testing/job/_search>>>> statusRENormalizer.pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}], u'max_score': ..., u'total': 1}, u'timed_out': False, u'took': ...}
As you can see, our search response wrapper knows about some important values:
>>> response.start 0>>> response.size 0>>> response.total 1>>> response.pages 1>>> pprint(response.hits) [{u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}]
Now let’s search for more then one job:
>>> response = conn.search("Anstellung", 'testing', 'job') >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten eine gute Anstellung', u'title': u'Wir suchen einen Marketingplaner'}, u'_type': u'job'}, {u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}], u'max_score': ..., u'total': 2}, u'timed_out': False, u'took': ...}
Now try to limit the search result using form and size parameters:
>>> params = {'from': 0, 'size': 1} >>> response = conn.search("Anstellung", 'testing', 'job', **params) >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten eine gute Anstellung', u'title': u'Wir suchen einen Marketingplaner'}, u'_type': u'job'}], u'max_score': ..., u'total': 2}, u'timed_out': False, u'took': ...}>>> response.start 0>>> response.size 1>>> response.total 2>>> response.pages 2>>> params = {'from': 1, 'size': 1} >>> response = conn.search("Anstellung", 'testing', 'job', **params) >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'}, u'description': u'Wir bieten Ihnen eine gute Anstellung', u'location': [..., ...], u'published': datetime.datetime(2011, 2, 24, 12, 0), u'requirements': [{u'description': u'MBA Abschluss', u'name': u'MBA'}], u'tags': [u'MBA', u'certified'], u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}], u'max_score': ..., u'total': 2}, u'timed_out': False, u'took': ...}>>> response.start 1>>> response.size 1>>> response.total 2>>> response.pages 2
As you can see in the above sample, we have got only one hit in each query beacuse of our size=1 parameter and both search results show the total of 2 which we could get from the server without using size and from.
Index
This test will setup some sample data in our test setup method. After that a new elasticsearch instance in another sandbox is started for this test. Check the p01/elasticsearch/test.py file for more info about the sample data and elasticsearch server setup.
We will test if we can delete an existing index and create them with the same mapping again:
>>> import json >>> from pprint import pprint >>> import p01.elasticsearch.testing >>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer
Now let’s define a new elasticsearch connection based on our server pool:
>>> conn = p01.elasticsearch.testing.getTestConnection()
Now we are ready to access the elasticsearch server. Check the status:
>>> statusRENormalizer.pprint(conn.status()) {u'_shards': {u'failed': 0, u'successful': 1, u'total': 1}, u'indices': {u'companies': {u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'primary_size': u'...', u'primary_size_in_bytes': ..., u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'shards': {u'0': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'companies', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 0, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}]}, u'translog': {u'operations': 0}}}, u'ok': True}
As you can see, we can test our sample data created mapping:
>>> pprint(conn.getMapping('companies', 'company')) {u'company': {u'properties': {u'__name__': {u'type': u'string'}, u'city': {u'type': u'string'}, u'number': {u'ignore_malformed': False, u'type': u'long'}, u'street': {u'type': u'string'}, u'text': {u'type': u'string'}, u'zip': {u'type': u'string'}}}}
And search for our sample data where we added within our sample data generator in our test setup:
>>> pprint(conn.search('street').total) 100
deleteIndex
Now we will delete the index:
>>> conn.deleteIndex('companies') {u'acknowledged': True, u'ok': True}
As you can see there is no index anymore:
>>> statusRENormalizer.pprint(conn.status()) {u'_shards': {u'failed': 0, u'successful': 0, u'total': 0}, u'indices': {}, u'ok': True}
createIndex
Now we can create the index again. Let’s get our sample data mapping:
>>> import os.path >>> import json >>> import p01.elasticsearch >>> mFile = os.path.join(os.path.dirname(p01.elasticsearch.__file__), ... 'sample', 'config', 'companies', 'company.json')>>> f = open(mFile) >>> data = f.read() >>> f.close() >>> mappings = json.loads(data) >>> pprint(mappings) {u'company': {u'_all': {u'enabled': True, u'store': u'yes'}, u'_id': {u'store': u'yes'}, u'_index': {u'enabled': True}, u'_source': {u'enabled': False}, u'_type': {u'store': u'yes'}, u'properties': {u'__name__': {u'include_in_all': False, u'index': u'not_analyzed', u'store': u'yes', u'type': u'string'}, u'_id': {u'include_in_all': False, u'index': u'no', u'store': u'yes', u'type': u'string'}, u'city': {u'boost': 1.0, u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'street': {u'boost': 1.0, u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'text': {u'boost': 1.0, u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'zip': {u'boost': 1.0, u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}}}}
Now we can create an new index with the given mapping:
>>> conn.createIndex('companies', mappings=mappings) {u'acknowledged': True, u'ok': True}
As you can see, our index and mapping is back again:
>>> statusRENormalizer.pprint(conn.status()) {u'_shards': {u'failed': 0, u'successful': 1, u'total': 1}, u'indices': {u'companies': {u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'primary_size': u'...', u'primary_size_in_bytes': ..., u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'shards': {u'0': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'companies', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 0, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}]}, u'translog': {u'operations': 0}}}, u'ok': True}>>> pprint(conn.getMapping('companies', 'company')) {u'company': {u'_all': {u'store': u'yes'}, u'_id': {u'store': u'yes'}, u'_index': {u'enabled': True}, u'_source': {u'enabled': False}, u'_type': {u'store': u'yes'}, u'properties': {u'__name__': {u'include_in_all': False, u'index': u'not_analyzed', u'store': u'yes', u'type': u'string'}, u'city': {u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'street': {u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'text': {u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}, u'zip': {u'include_in_all': True, u'index': u'not_analyzed', u'null_value': u'na', u'store': u'yes', u'type': u'string'}}}}
As you can see the index is empty:
>>> pprint(conn.search('street').total) 0
Mapping
Note: this test will start and run an elasticsearch server on port 45299!
This test experiments with some mapping configurations. Since the elasitcsearch documentation is not very clear to me. I try to find out how the mapping part has to be done here.
>>> from pprint import pprint >>> from p01.elasticsearch import interfaces >>> from p01.elasticsearch.pool import ServerPool >>> from p01.elasticsearch.pool import ElasticSearchConnectionPool
Setup a conncetion:
>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers) >>> connectionPool = ElasticSearchConnectionPool(serverPool) >>> conn = connectionPool.connection
Let’s setup a mapping definition:
>>> mapping = { ... 'item': { ... 'properties': { ... 'boolean': { ... 'type': 'boolean' ... }, ... 'date': { ... 'type': 'date' ... }, ... 'datetime': { ... 'type': 'date' ... }, ... 'double': { ... 'type': 'double' ... }, ... 'float': { ... 'type': 'float' ... }, ... 'integer': { ... 'type': 'integer' ... }, ... 'long': { ... 'type': 'long' ... }, ... 'string': { ... 'type': 'string', ... 'null_value' : 'nada' ... }, ... } ... } ... }
No let’s add the mapping using our putMapping method and call refresh:
>>> conn.putMapping(mapping, 'test-mapping', 'item') Traceback (most recent call last): ... IndexMissingException: [test-mapping] missing
as you can see there was an exception because our index doesn’t exist yet. Let’s add our test-mapping index and try again:
>>> conn.createIndex('test-mapping') {u'acknowledged': True, u'ok': True}>>> pprint(conn.refresh('test-mapping', 4)) {u'_shards': {u'failed': 0, u'successful': ..., u'total': 10}, u'ok': True}>>> conn.putMapping(mapping, 'test-mapping', 'item') {u'acknowledged': True, u'ok': True}>>> pprint(conn.refresh('test-mapping', 4)) {u'_shards': {u'failed': 0, u'successful': ..., u'total': 10}, u'ok': True}
And get our mapping:
>>> pprint(conn.getMapping('test-mapping', 'item'), width=60) {u'item': {u'properties': {u'boolean': {u'type': u'boolean'}, u'date': {u'format': u'dateOptionalTime', u'type': u'date'}, u'datetime': {u'format': u'dateOptionalTime', u'type': u'date'}, u'double': {u'type': u'double'}, u'float': {u'type': u'float'}, u'integer': {u'type': u'integer'}, u'long': {u'type': u'long'}, u'string': {u'null_value': u'nada', u'type': u'string'}}}}
Now let’s index a new item:
>>> import datetime >>> doc = {'boolean': True, ... 'datetime': datetime.datetime(2011, 02, 24, 12, 0, 0), ... 'date': datetime.date(2011, 02, 24), ... 'float': float(42), ... 'integer': int(42), ... 'long': long(42*10000000000000000), ... 'string': 'string'} >>> conn.index(doc, 'test-mapping', 'item', 1) {u'_type': u'item', u'_id': u'1', u'ok': True, u'_version': 1, u'_index': u'test-mapping'}
refresh index:
>>> pprint(conn.refresh('test-mapping', 4)) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'ok': True}
and search for our index items:
>>> response = conn.search('string', 'test-mapping', 'item') >>> data = response.data >>> pprint(data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'test-mapping', u'_score': ..., u'_source': {u'boolean': True, u'date': datetime.datetime(2011, 2, 24, 0, 0), u'datetime': datetime.datetime(2011, 2, 24, 12, 0), u'float': 42.0, u'integer': 42, u'long': 420000000000000000L, u'string': u'string'}, u'_type': u'item'}], u'max_score': ..., u'total': 1}, u'timed_out': False, u'took': ...}
Now check our values:
>>> source = data['hits']['hits'][0]['_source'] >>> pprint(source) {u'boolean': True, u'date': datetime.datetime(2011, 2, 24, 0, 0), u'datetime': datetime.datetime(2011, 2, 24, 12, 0), u'float': 42.0, u'integer': 42, u'long': 420000000000000000L, u'string': u'string'}>>> isinstance(source['boolean'], bool) True>>> isinstance(source['datetime'], datetime.datetime) True>>> isinstance(source['date'], datetime.date) True>>> isinstance(source['float'], float) True>>> isinstance(source['integer'], int) True>>> isinstance(source['long'], long) True>>> isinstance(source['string'], basestring) True>>> isinstance(source['string'], unicode) True
Note, the datetime and date are also datetime and date items:
>>> isinstance(source['date'], datetime.datetime) True>>> isinstance(source['datetime'], datetime.date) True
Scan Search Type
Note: this test will start and run an elasticsearch server on port 45299!
Let’s just do some simple tests without to use a connection pool.
>>> from pprint import pprint >>> from p01.elasticsearch.connection import ElasticSearchConnection >>> from p01.elasticsearch.exceptions import ElasticSearchServerException >>> from p01.elasticsearch.pool import ServerPool>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers)
Now we are able to get a connection which is persistent and observed by a thread local.
>>> conn = ElasticSearchConnection(serverPool)
Setup a test mapping and add a few documents:
>>> conn.createIndex('scanning') {u'acknowledged': True, u'ok': True}>>> for i in range(1000): ... _id = unicode(i) ... doc = {'_id': _id, 'dummy': u'dummy'} ... ignored = conn.index(doc, 'scanning', 'doc')>>> conn.refresh('scanning') {u'ok': True, u'_shards': {u'successful': 5, u'failed': 0, u'total': 10}}
Let’s show how we can batch large search results with our scan method.
>>> pprint(conn.search('dummy', 'scanning').total) 1000>>> result = list(conn.scan('dummy', 'scanning')) >>> len(result) 1000>>> pprint(sorted(result)[:5]) [{u'_id': u'0', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'0', u'dummy': u'dummy'}, u'_type': u'doc'}, {u'_id': u'1', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'1', u'dummy': u'dummy'}, u'_type': u'doc'}, {u'_id': u'10', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'10', u'dummy': u'dummy'}, u'_type': u'doc'}, {u'_id': u'100', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'100', u'dummy': u'dummy'}, u'_type': u'doc'}, {u'_id': u'101', u'_index': u'scanning', u'_score': 0.0, u'_source': {u'_id': u'101', u'dummy': u'dummy'}, u'_type': u'doc'}]
Bulk
Note: this test will start and run an elasticsearch server on port 45299!
This test shows how to index items using the bulk concept.
>>> from pprint import pprint >>> from p01.elasticsearch import interfaces >>> from p01.elasticsearch.pool import ServerPool >>> from p01.elasticsearch.pool import ElasticSearchConnectionPool>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers)
Now we are able to get a connection which is persistent and observed by a thread local from the pool:
>>> connectionPool = ElasticSearchConnectionPool(serverPool) >>> conn = connectionPool.connection >>> conn <ElasticSearchConnection localhost:45299>
Let’s set the bulkMaxSize to 5. This means if we index 5 items the index method will implicit send a index request to the server
>>> conn.bulkMaxSize = 5>>> conn.bulkMaxSize 5
Let’s bulk index some items:
>>> doc = {'title': u'Wir suchen einen Marketingplaner', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 1)>>> doc = {'title': u'Wir suchen einen Buchhalter', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 2)
Now commit our bulk data. even if we not indexed the full amount of bulkMaxSize:
>>> pprint(conn.bulkCommit()) {u'items': [{u'index': {u'_id': u'1', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'2', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}], u'took': ...}>>> conn.bulkCounter 0
Now we search the items:
>>> response = conn.search("Anstellung", 'testing', 'job') >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [], u'max_score': None, u'total': 0}, u'timed_out': False, u'took': ...}
As you can see, we didn’t comit the data because we didn’t use the refresh parameter. Let’s call refresh now:
>>> conn.refresh('testing') {u'ok': True, u'_shards': {u'successful': 5, u'failed': 0, u'total': 10}}
- and search again:
>>> response = conn.search("Anstellung", 'testing', 'job') >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Marketingplaner'}, u'_type': u'job'}, {u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}], u'max_score': ..., u'total': 2}, u'timed_out': False, u'took': ...}
Let’s index more items till we reach the bulkMaxSize:
>>> len(conn.bulkItems) 0>>> doc = {'title': u'Wir suchen einen Koch', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 3)>>> conn.bulkCounter 1>>> doc = {'title': u'Wir suchen einen Sachbearbeiter', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 4)>>> conn.bulkCounter 2>>> doc = {'title': u'Wir suchen einen Mechaniker', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 5)>>> conn.bulkCounter 3>>> doc = {'title': u'Wir suchen einen Exportfachmann', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> conn.bulkIndex(doc, 'testing', 'job', 6)>>> conn.bulkCounter 4
Now, our bulkMaxSize forces to commit data:
>>> doc = {'title': u'Wir suchen einen Entwickler', ... 'description': u'Wir bieten Ihnen eine gute Anstellung'} >>> pprint(conn.bulkIndex(doc, 'testing', 'job', 7)) {u'items': [{u'index': {u'_id': u'3', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'4', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'5', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'6', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}, {u'index': {u'_id': u'7', u'_index': u'testing', u'_type': u'job', u'_version': 1, u'ok': True}}], u'took': ...}
just wait till the server calls refresh by itself every second by default:
>>> import time >>> time.sleep(1)>>> len(conn.bulkItems) 0
As you can see, we have all 7 items indexed:
>>> response = conn.search("Anstellung", 'testing', 'job') >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Marketingplaner'}, u'_type': u'job'}, {u'_id': u'6', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Exportfachmann'}, u'_type': u'job'}, {u'_id': u'2', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Buchhalter'}, u'_type': u'job'}, {u'_id': u'7', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Entwickler'}, u'_type': u'job'}, {u'_id': u'4', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Sachbearbeiter'}, u'_type': u'job'}, {u'_id': u'5', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Mechaniker'}, u'_type': u'job'}, {u'_id': u'3', u'_index': u'testing', u'_score': ..., u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung', u'title': u'Wir suchen einen Koch'}, u'_type': u'job'}], u'max_score': ..., u'total': 7}, u'timed_out': False, u'took': ...}
Simple indexing and search
Note: this test will start and run an elasticsearch server on port 45299!
This test just uses non predefined mappings. Let’s just do some simple tests without to use a connection pool.
>>> from pprint import pprint >>> from p01.elasticsearch.connection import ElasticSearchConnection >>> from p01.elasticsearch.exceptions import ElasticSearchServerException >>> from p01.elasticsearch.pool import ServerPool>>> import p01.elasticsearch.testing >>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer>>> servers = ['localhost:45299'] >>> serverPool = ServerPool(servers)
Now we are able to get a connection which is persistent and observed by a thread local.
>>> conn = ElasticSearchConnection(serverPool)
Add a few documents:
>>> pprint(conn.index({"name":"Document One"}, "testdocs", "doc", 1)) {u'_id': u'1', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}>>> pprint(conn.index({"name":"Document Two"}, "testdocs", "doc", 2)) {u'_id': u'2', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}
Note, we call refresh here which will ensure that the document get indexed at the server side. Normaly this should not be dine explicit in a production setup. The elasticsearch server is by default configured that each second the refresh happens at the server side:
>>> pprint(conn.refresh("testdocs")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'ok': True}
Get one:
>>> pprint(conn.get(1, "testdocs", "doc")) {u'_id': u'1', u'_index': u'testdocs', u'_source': {u'name': u'Document One'}, u'_type': u'doc', u'_version': 1, u'exists': True}
Count the documents:
>>> pprint(conn.count("name:Document One")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'count': 2}>>> pprint(conn.count("name:Document")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'count': 2}
Search a document:
>>> response = conn.search("name:Document One") >>> pprint(response.data) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'1', u'_index': u'testdocs', u'_score': 0.2712221, u'_source': {u'name': u'Document One'}, u'_type': u'doc'}, {u'_id': u'2', u'_index': u'testdocs', u'_score': 0.028130025, u'_source': {u'name': u'Document Two'}, u'_type': u'doc'}], u'max_score': 0.2712221, u'total': 2}, u'timed_out': False, u'took': ...}
More like this:
>>> pprint(conn.index({"name":"Document Three"}, "testdocs", "doc", 3)) {u'_id': u'3', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}>>> pprint(conn.refresh("testdocs")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'ok': True}>>> pprint(conn.moreLikeThis(1, "testdocs", "doc", ... fields='name', min_term_freq=1, min_doc_freq=1)) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 5}, u'hits': {u'hits': [{u'_id': u'2', u'_index': u'testdocs', u'_score': 0.19178301, u'_source': {u'name': u'Document Two'}, u'_type': u'doc'}, {u'_id': u'3', u'_index': u'testdocs', u'_score': 0.19178301, u'_source': {u'name': u'Document Three'}, u'_type': u'doc'}], u'max_score': 0.19178301, u'total': 2}, u'timed_out': False, u'took': ...}
Delete Document Two:
>>> pprint(conn.delete('2', "testdocs", "doc")) {u'_id': u'2', u'_index': u'testdocs', u'_type': u'doc', u'_version': 2, u'found': True, u'ok': True}
Delete Document Three:
>>> pprint(conn.delete('3', "testdocs", "doc")) {u'_id': u'3', u'_index': u'testdocs', u'_type': u'doc', u'_version': 2, u'found': True, u'ok': True}
Delete the index:
>>> pprint(conn.deleteIndex("testdocs")) {u'acknowledged': True, u'ok': True}
Create the index a new index:
>>> pprint(conn.createIndex("testdocs")) {u'acknowledged': True, u'ok': True}
Try to create the index again which will fail:
>>> conn.createIndex("testdocs") Traceback (most recent call last): ... IndexAlreadyExistsException: Already exists
As you can see, the error provides an error message:
>>> try: ... conn.createIndex("testdocs") ... except ElasticSearchServerException, e: ... e.args[0] 'Already exists'
Add a new mapping:
>>> mapping = {"doc" : {"properties" : ... {"name" : {"type" : "string", "store" : "yes"}}}} >>> pprint(conn.putMapping(mapping, 'testdocs', 'doc')) {u'acknowledged': True, u'ok': True}
Get the status:
>>> statusRENormalizer.pprint(conn.status("testdocs")) {u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'indices': {u'testdocs': {u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'primary_size': u'...', u'primary_size_in_bytes': ..., u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'shards': {u'0': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 0, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}], u'1': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 1, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}], u'2': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 2, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}], u'3': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 3, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}], u'4': [{u'docs': {u'deleted_docs': 0, u'max_doc': ..., u'num_docs': ...}, u'flush': {u'total': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'index': {u'size': u'...', u'size_in_bytes': ...}, u'merges': {u'current': 0, u'current_docs': 0, u'current_size': u'0b', u'current_size_in_bytes': 0, u'total': 0, u'total_docs': 0, u'total_size': u'0b', u'total_size_in_bytes': 0, u'total_time': u'...', u'total_time_in_millis': ...}, u'refresh': {u'total': ..., u'total_time': u'...', u'total_time_in_millis': ...}, u'routing': {u'index': u'testdocs', u'node': u'...', u'primary': True, u'relocating_node': None, u'shard': 4, u'state': u'STARTED'}, u'state': u'STARTED', u'translog': {u'id': ..., u'operations': 0}}]}, u'translog': {u'operations': 0}}}, u'ok': True}
Test adding with automatic id generation.
>>> pprint(conn.index({"name":"Document Four"}, "testdocs", "doc")) Traceback (most recent call last): ... ValueError: You must explicit define id=None without doc['_id']
As you can see, this requires that we set explicit id=None:
>>> pprint(conn.index({"name":"Document Four"}, "testdocs", "doc", id=None)) {u'_id': u'...', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}
The reason for setting explicit id=None is that we also support doc[‘_id’] as id:
>>> pprint(conn.index({"name":"Document Five", "_id":"5"}, "testdocs", "doc")) {u'_id': u'...', u'_index': u'testdocs', u'_type': u'doc', u'_version': 1, u'ok': True}
CHANGES
0.5.2 (2013-06-28)
bugfix: improve error handling. Use json response string if no error message is given.
0.5.1 (2012-12-22)
implemented put settings (putSettings) method
fix tests based on changed elasticsearch 0.20.1 output
switch to p01.recipe.setup:importchecker
0.5.0 (2012-11-18)
initial release
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.