Welcome to RxSci’s documentation!¶
Get Started¶
This packages exposes one adapter per consul API family.
KV Store¶
One can Access the KV store with the kv adapter:
The constructor of the kv adapter takes an observable of http_response as input, and returns an Adapter object. The Adapter object contains two properties:
sink: An observable of http requests (requests sent to the consul endpoint)
api: An accessor to the KV Store APIs
Read the value of a key in the KV Store¶
Reading a key is done the following way:
import cyclotron_consul.kv as kv
kv_adapter = kv.adapter(source.http.response)
value = kv_adapter.api.read_key("http://localhost:8500", "test")
Where value is an observable that emits a single item with a Key/Value object.
Here is a full example:
from collections import namedtuple
import rx
import rx.operators as ops
from cyclotron import Component
from cyclotron.asyncio.runner import run
import cyclotron_aiohttp.http as http
import cyclotron_std.sys.stdout as stdout
import cyclotron_consul.kv as kv
ReadKeySource = namedtuple('ReadKeySource', ['http'])
ReadKeySink = namedtuple('ReadKeySink', ['http', 'stdout'])
ReadKeyDrivers = namedtuple('ReadKeyDrivers', ['http', 'stdout'])
def read_key(source):
kv_adapter = kv.adapter(source.http.response)
value = kv_adapter.api.read_key("http://localhost:8500", "test").pipe(
ops.map(lambda i: "key: {}, value: {}".format(i.key, i.value))
)
return ReadKeySink(
http=http.Sink(request=kv_adapter.sink),
stdout=stdout.Sink(data=value),
)
def main():
run(Component(call=read_key, input=ReadKeySource),
ReadKeyDrivers(
http=http.make_driver(),
stdout=stdout.make_driver(),
)
)
if __name__ == '__main__':
main()
Watch the value of a key in the KV Store¶
This is similar to reading a key, except that the result observable emits a new items each time the value of the key changes:
import cyclotron_consul.kv as kv
kv_adapter = kv.adapter(source.http.response)
value = kv_adapter.api.watch_key("http://localhost:8500", "test")
Where value is an observable that emits an item with a Key/Value object each time the value of the key is updated on consul.
Here is a full example:
from collections import namedtuple
import rx
import rx.operators as ops
from cyclotron import Component
from cyclotron.asyncio.runner import run
import cyclotron_aiohttp.http as http
import cyclotron_std.sys.stdout as stdout
import cyclotron_consul.kv as kv
ReadKeySource = namedtuple('ReadKeySource', ['http'])
ReadKeySink = namedtuple('ReadKeySink', ['http', 'stdout'])
ReadKeyDrivers = namedtuple('ReadKeyDrivers', ['http', 'stdout'])
def read_key(source):
kv_adapter = kv.adapter(source.http.response)
value = kv_adapter.api.watch_key("http://localhost:8500", "test").pipe(
ops.map(lambda i: "key: {}, value: {}".format(i.key, i.value)),
)
return ReadKeySink(
http=http.Sink(request=kv_adapter.sink),
stdout=stdout.Sink(data=value),
)
def main():
run(Component(call=read_key, input=ReadKeySource),
ReadKeyDrivers(
http=http.make_driver(),
stdout=stdout.make_driver(),
)
)
if __name__ == '__main__':
main()
Reference¶
-
class
cyclotron_consul.kv.
Adapter
(sink, api)¶ -
property
api
¶ Alias for field number 1
-
property
sink
¶ Alias for field number 0
-
property
-
class
cyclotron_consul.kv.
Api
(read_key, watch_key)¶ -
property
read_key
¶ Alias for field number 0
-
property
watch_key
¶ Alias for field number 1
-
property
-
class
cyclotron_consul.kv.
KeyValue
(key, value)¶ -
property
key
¶ Alias for field number 0
-
property
value
¶ Alias for field number 1
-
property
-
cyclotron_consul.kv.
adapter
(source)¶ Creates a consul adapter for the KV API.
- Parameters
source – an aiohttp response stream.
- Returns
A Client object
-
cyclotron_consul.kv.
read_key
(http_client, endpoint, key)¶ Reads a key on the specified endpoint
- Parameters
http_client – An instance of http client. This parameter is already binded when called from the adapter.
endpoint – the consul server full url
key – The key to read
- Returns
On observable that emits a single KeyValue item if the request succeeds. Otherwise the observable completes on error.
Example
>>>import cyclotron_consul.kv as kv >>>client = kv.client(http_source) >>>client.api.read_key(“http://localhost:8500”, “mykey”).subscribe( >>> on_next=print >>>) >>># forward client.sink to http driver sink
-
cyclotron_consul.kv.
watch_key
(http_client, endpoint, key)¶ Reads a key on the specified endpoint, and watch for updates.
- Parameters
http_client – An instance of http client. This parameter is already binded when called from the adapter.
endpoint – the consul server full url
key – The key to read
- Returns
On observable that emits a KeyValue item each time the value associated to the key is updated on consul. In case of error, the observable completes on error.
Example
>>>import cyclotron_consul.kv as kv >>>client = kv.client(http_source) >>>client.api.watch_key(“http://localhost:8500”, “mykey”).subscribe( >>> on_next=print >>>) >>># forward client.sink to http driver sink