-
-
Save thanksmister/76d8ed3385b9f9f86e74 to your computer and use it in GitHub Desktop.
| // data sevice class calling API and returning list of exchanges | |
| public class DataService | |
| { | |
| private PublishSubject<List<Exchange>> exchangesRequest; | |
| public DataService() | |
| {} | |
| // Get exchanges without any polling which returns results to subscription on time | |
| public Subscription getExchanges(final Observer<List<Exchange>> observer) | |
| { | |
| // join the request if instantiated | |
| if(exchangesRequest != null) { | |
| return exchangesRequest.subscribe(observer); | |
| } | |
| exchangesRequest = PublishSubject.create(); | |
| exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
| @Override | |
| public void onCompleted() { | |
| } | |
| @Override | |
| public void onError(Throwable throwable) { | |
| } | |
| @Override | |
| public void onNext(List<Exchange> exchanges) { | |
| // Cache data in database | |
| } | |
| }); | |
| Subscription subscription = exchangesRequest.subscribe(observer); | |
| getExchangesObservable() | |
| .subscribeOn(Schedulers.newThread()) | |
| .observeOn(AndroidSchedulers.mainThread()) | |
| .subscribe(exchangesRequest); | |
| return subscription; | |
| } | |
| // Get exchanges on a timer every 5 minutes, problem with this is stops on error and emmits two results every time it runs | |
| public Subscription getExchangesTimer(final Observer<List<Exchange>> observer) | |
| { | |
| // join the request if instantiated | |
| if(exchangesRequest != null) { | |
| return exchangesRequest.subscribe(observer); | |
| } | |
| exchangesRequest = PublishSubject.create(); | |
| exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
| @Override | |
| public void onCompleted() { | |
| } | |
| @Override | |
| public void onError(Throwable throwable) { | |
| } | |
| @Override | |
| public void onNext(List<Exchange> exchanges) { | |
| // Cache data in database | |
| } | |
| }); | |
| // TODO this returns twice for each time it runs, also stops polling on error | |
| Subscription subscription = exchangesRequest.subscribe(observer); | |
| Observable.timer(0, (2 * 60 * 1000), TimeUnit.MILLISECONDS).timeInterval() | |
| .flatMap(new Func1<TimeInterval<Long>, Observable<List<Exchange>>>() { | |
| @Override | |
| public Observable<List<Exchange>> call(TimeInterval<Long> longTimeInterval) { | |
| return getExchangesObservable(); | |
| } | |
| }) | |
| .subscribeOn(Schedulers.newThread()) | |
| .observeOn(AndroidSchedulers.mainThread()) | |
| .subscribe(exchangesRequest); | |
| return subscription; | |
| } | |
| // get exchanges scheduled periodically, problem with this approach is runs continuously ignoring interval and explodes application | |
| public Subscription getExchangesPeriodically(final Observer<List<Exchange>> observer) | |
| { | |
| // join the request if instantiated | |
| if(exchangesRequest != null) { | |
| return exchangesRequest.subscribe(observer); | |
| } | |
| exchangesRequest = PublishSubject.create(); | |
| exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
| @Override | |
| public void onCompleted() { | |
| } | |
| @Override | |
| public void onError(Throwable throwable) { | |
| } | |
| @Override | |
| public void onNext(List<Exchange> exchanges) { | |
| // Cache data in database | |
| } | |
| }); | |
| Subscription subscription = exchangesRequest.subscribe(observer); | |
| Scheduler.Worker worker = Schedulers.newThread().createWorker(); | |
| worker.schedulePeriodically(new Action0() { | |
| @Override | |
| public void call() { | |
| getExchangesObservable() | |
| .subscribeOn(Schedulers.newThread()) | |
| .observeOn(AndroidSchedulers.mainThread()) | |
| .subscribe(exchangesRequest); | |
| } | |
| }, 0, (2 * 60 * 1000), TimeUnit.MILLISECONDS); | |
| return subscription; | |
| } | |
| // Uses a Schedulers.io().createWorker() to run the service call periodically. | |
| public Subscription getExchangesWorker(final Observer<List<Exchange>> observer) | |
| { | |
| // join the request if instantiated | |
| if(exchangesRequest != null) { | |
| return exchangesRequest.subscribe(observer); | |
| } | |
| exchangesRequest = PublishSubject.create(); | |
| exchangesRequest.subscribe(new Observer<List<Exchange>>() { | |
| @Override | |
| public void onCompleted() { | |
| } | |
| @Override | |
| public void onError(Throwable throwable) { | |
| } | |
| @Override | |
| public void onNext(List<Exchange> exchanges) { | |
| // Cache data in database | |
| } | |
| }); | |
| Subscription subscription = exchangesRequest.subscribe(observer); | |
| Observable.create(new Observable.OnSubscribe<List<Exchange>>() | |
| { | |
| @Override | |
| public void call(Subscriber<? super List<Exchange>> subscriber) | |
| { | |
| Schedulers.io().createWorker() | |
| .schedulePeriodically(new Action0() | |
| { | |
| @Override | |
| public void call() | |
| { | |
| getExchangesObservable().doOnNext(new Action1<List<Exchange>>() | |
| { | |
| @Override | |
| public void call(List<Exchange> exchanges) | |
| { | |
| subscriber.onNext(exchanges); | |
| } | |
| }).doOnError(new Action1<Throwable>() | |
| { | |
| @Override | |
| public void call(Throwable throwable) | |
| { | |
| if(throwable != null) | |
| subscriber.onError(throwable); | |
| } | |
| }).subscribe(); | |
| } | |
| }, 500, CHECK_EXCHANGE_DATA, TimeUnit.MILLISECONDS); | |
| } | |
| }) | |
| .subscribeOn(Schedulers.newThread()) | |
| .observeOn(AndroidSchedulers.mainThread()) | |
| .subscribe(exchangesRequest); | |
| return subscription; | |
| } | |
| private Observable<List<Exchange>> getExchangesObservable() | |
| { | |
| BitcoinAverage bitcoinAverage = provideBitcoinAverage(); | |
| return bitcoinAverage.exchanges(USD) | |
| } | |
| private BitcoinAverage provideBitcoinAverage() | |
| { | |
| RestAdapter restAdapter = new RestAdapter.Builder() | |
| .setEndpoint("https://api.bitcoinaverage.com") | |
| .build(); | |
| return restAdapter.create(BitcoinAverage.class); | |
| } | |
| private interface BitcoinAverage | |
| { | |
| @GET("/exchanges/{currency}") | |
| Observable<List<Exchange>> exchanges(@Path("currency") String currency); | |
| } | |
| public class Exchange | |
| { | |
| public String display_name; | |
| public String ask; | |
| public String bid; | |
| public String last; | |
| public String source; | |
| public float volume_btc; | |
| public float volume_percent; | |
| public String blue_bid; | |
| public String blue_ask; | |
| public String official_bid; | |
| public String official_ask; | |
| public String date; | |
| } | |
| } |
| public class ExchangeView extends FrameLayout | |
| { | |
| private Subscription subscription; | |
| private DataService dataService; | |
| public ExchangeView(Context context, AttributeSet attrs) | |
| { | |
| super(context, attrs); | |
| dataService = new DataServices(); | |
| } | |
| @Override | |
| protected void onFinishInflate() | |
| { | |
| super.onFinishInflate(); | |
| getExchangeData(); | |
| } | |
| @Override | |
| protected void onDetachedFromWindow() | |
| { | |
| super.onDetachedFromWindow(); | |
| if(subscription != null) | |
| subscription.unsubscribe(); | |
| } | |
| public void getExchangeData() | |
| { | |
| // should update exchanges on interval, handle errors, and be able to unsubscribe | |
| subscription = dataService.getExchangesTimer(new Observer<List<Exchange>>() | |
| { | |
| @Override | |
| public void onNext(List<Exchange> exchanges) { | |
| // TODO update exchanges on view | |
| } | |
| @Override | |
| public void onCompleted() { | |
| } | |
| @Override | |
| public void onError(Throwable throwable) { | |
| if (throwable instanceof RetrofitError) { | |
| if (((RetrofitError) throwable).isNetworkError()) { | |
| // TODO handle network error | |
| } else { | |
| // TODO handle service error | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } |
Please note that the API returns a "Response" from Retrofit that must be formatted into an Exchange. I have made it look as though Exchanges are properly returned in a list without the need for extra formatting but in reality they do. So this won't just run as is.
I have looked at the notes here (ReactiveX/RxJava#448) but its not clear how to return the subscription back to view and the method outlined for using a Scheduler are now deprecated in latest RxJava:
Observable.create({ observer ->
Schedulers.newThread().schedulePeriodically({
observer.onNext("application-state-from-network");
}, 0, 1000, TimeUnit.MILLISECONDS);
}).take(10).subscribe({ v -> println(v) });
It's also hard to reverse engineer a lambda without being familiar with the original.
Added getExchangesWorker method for using a scheduled worker within an observable for hitting the API periodically. This method seems to be working until I encounter an error then it stops emitting data.
This is what I have currently and also what I have tried when creating a subscription that has polling. You can see the regular subscription (getExchanges), the subscription using a timer (getExchangesTimer) and the subscription that uses a scheduler (getExchangesPeriodically).
The Timer seems to fire more than once and stops running on error. The Scheduler runs continuously (ignoring the time interval) and eventually causes a memory error in the application. Right now I am manually refreshing the data using a runner instead of trying to do this in RxJava.