streamify.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. var async = require('./async.js');
  2. // API
  3. module.exports = {
  4. iterator: wrapIterator,
  5. callback: wrapCallback
  6. };
  7. /**
  8. * Wraps iterators with long signature
  9. *
  10. * @this ReadableAsyncKit#
  11. * @param {function} iterator - function to wrap
  12. * @returns {function} - wrapped function
  13. */
  14. function wrapIterator(iterator)
  15. {
  16. var stream = this;
  17. return function(item, key, cb)
  18. {
  19. var aborter
  20. , wrappedCb = async(wrapIteratorCallback.call(stream, cb, key))
  21. ;
  22. stream.jobs[key] = wrappedCb;
  23. // it's either shortcut (item, cb)
  24. if (iterator.length == 2)
  25. {
  26. aborter = iterator(item, wrappedCb);
  27. }
  28. // or long format (item, key, cb)
  29. else
  30. {
  31. aborter = iterator(item, key, wrappedCb);
  32. }
  33. return aborter;
  34. };
  35. }
  36. /**
  37. * Wraps provided callback function
  38. * allowing to execute snitch function before
  39. * real callback
  40. *
  41. * @this ReadableAsyncKit#
  42. * @param {function} callback - function to wrap
  43. * @returns {function} - wrapped function
  44. */
  45. function wrapCallback(callback)
  46. {
  47. var stream = this;
  48. var wrapped = function(error, result)
  49. {
  50. return finisher.call(stream, error, result, callback);
  51. };
  52. return wrapped;
  53. }
  54. /**
  55. * Wraps provided iterator callback function
  56. * makes sure snitch only called once,
  57. * but passes secondary calls to the original callback
  58. *
  59. * @this ReadableAsyncKit#
  60. * @param {function} callback - callback to wrap
  61. * @param {number|string} key - iteration key
  62. * @returns {function} wrapped callback
  63. */
  64. function wrapIteratorCallback(callback, key)
  65. {
  66. var stream = this;
  67. return function(error, output)
  68. {
  69. // don't repeat yourself
  70. if (!(key in stream.jobs))
  71. {
  72. callback(error, output);
  73. return;
  74. }
  75. // clean up jobs
  76. delete stream.jobs[key];
  77. return streamer.call(stream, error, {key: key, value: output}, callback);
  78. };
  79. }
  80. /**
  81. * Stream wrapper for iterator callback
  82. *
  83. * @this ReadableAsyncKit#
  84. * @param {mixed} error - error response
  85. * @param {mixed} output - iterator output
  86. * @param {function} callback - callback that expects iterator results
  87. */
  88. function streamer(error, output, callback)
  89. {
  90. if (error && !this.error)
  91. {
  92. this.error = error;
  93. this.pause();
  94. this.emit('error', error);
  95. // send back value only, as expected
  96. callback(error, output && output.value);
  97. return;
  98. }
  99. // stream stuff
  100. this.push(output);
  101. // back to original track
  102. // send back value only, as expected
  103. callback(error, output && output.value);
  104. }
  105. /**
  106. * Stream wrapper for finishing callback
  107. *
  108. * @this ReadableAsyncKit#
  109. * @param {mixed} error - error response
  110. * @param {mixed} output - iterator output
  111. * @param {function} callback - callback that expects final results
  112. */
  113. function finisher(error, output, callback)
  114. {
  115. // signal end of the stream
  116. // only for successfully finished streams
  117. if (!error)
  118. {
  119. this.push(null);
  120. }
  121. // back to original track
  122. callback(error, output);
  123. }